You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@subversion.apache.org by st...@apache.org on 2021/04/10 12:11:22 UTC

svn commit: r1888583 - /subversion/trunk/subversion/libsvn_subr/task.c

Author: stefan2
Date: Sat Apr 10 12:11:22 2021
New Revision: 1888583

URL: http://svn.apache.org/viewvc?rev=1888583&view=rev
Log:
Add multi-threaded execution to the task runner.

The basic execution logic is the same as for sequential execution but the
place of execution is different.  The main thread spawns off worker threads
as needed and processes the outputs sequentially such that it appears as
sequential albeit fast processing.

Access to memory pools and the task tree need to serialized using mutexes.
Also, some effort is put into assigning tasks to workers in way that maximizes
locality and minimizes interference / overhead between tasks.

* subversion/libsvn_subr/task.c
  (root_t): Add mutexes, flags and condition variables needed.
            Correct the docstring for RESULTS_POOL.
  (svn_task__t): Add pointer to quickly find the next unprocessed sub-tree.
  (link_new_task): Update the new pointer in svn_task__t.
  (next_unprocessed): New utility function.
  (unready_task): Use the utility to update new pointer in svn_task__t.

  (add_task): Sync access to tree. Notify workers of new tasks.
  (svn_task__add): Sync access to memory pool.
  (is_contented,
   set_processed_and_pick): New functions to intelligently pick a task to
                            work on.
  (enforce_sequential_consistency): New synchronization utility.                            
  (output_processed): Sync access to tree.
  (next_task,
   worker_cancelled,
   send_terminate,
   worker,
   worker_thread): New functions implementing the worker tasks.
  (wait_for_outputting_state,
   execute_concurrently): New functions implementing the foreground output
                          processing and controlling the worker tasks.
  (root_cleanup): New utility function.
  (svn_task__run): Initialize the synchronization structures and pick the
                   appropriate execution model.

Modified:
    subversion/trunk/subversion/libsvn_subr/task.c

Modified: subversion/trunk/subversion/libsvn_subr/task.c
URL: http://svn.apache.org/viewvc/subversion/trunk/subversion/libsvn_subr/task.c?rev=1888583&r1=1888582&r2=1888583&view=diff
==============================================================================
--- subversion/trunk/subversion/libsvn_subr/task.c (original)
+++ subversion/trunk/subversion/libsvn_subr/task.c Sat Apr 10 12:11:22 2021
@@ -23,6 +23,7 @@
 #include "private/svn_task.h"
 
 #include <assert.h>
+#include <apr_thread_proc.h>
 
 #include "private/svn_atomic.h"
 #include "private/svn_thread_cond.h"
@@ -35,9 +36,44 @@
  */
 typedef struct root_t
 {
+  /* Global mutex protecting the whole task tree.
+   * Any modification on the tree structure or task state requires
+   * serialization through this mutex.
+   * 
+   * In single-threaded execution, this will be a no-op dummy.
+   */
+  svn_mutex__t *mutex;
+
+  /* Used to portably implement a memory barrier in C. */
+  svn_mutex__t *memory_barrier_mutex;
+
+  /* Signals to waiting ("sleeping") worker threads that they need to wake
+   * up.  This may be due to new tasks being available or because the task
+   * runner is about to terminate.
+   *
+   * Waiting tasks must hold the MUTEX when entering the waiting state.
+   *
+   * NULL if execution is single-threaded.
+   */
+  svn_thread_cond__t *worker_wakeup;
+
+  /* Signals to the foreground thread that some tasks may have been processed
+   * and output process may commence.  However, there is no guarantee that
+   * any task actually completed nor that it is the one whose output needs
+   * to be processed next. 
+   *
+   * Waiting tasks must hold the MUTEX when entering the waiting state.
+   *
+   * NULL if execution is single-threaded.
+   */
+  svn_thread_cond__t *task_processed;
+
   /* The actual root task. */
   svn_task__t *task;
 
+  /* All allocations from TASK_POOL must be serialized through this mutex. */
+  svn_mutex__t *task_alloc_mutex;
+
   /* Pools "segregated" for reduced lock contention when multi-threading.
    * Ordered by lifetime of the objects allocated in them (long to short).
    */
@@ -54,10 +90,10 @@ typedef struct root_t
    */
   apr_pool_t *process_pool;
 
-  /* Allocate per-task results_t as well as the actual outputs here.
-   * Allocation will happen just before calling the processing function.
-   * Release the memory immediately afterwards, unless some actual output
-   * has been produced.
+  /* Allocate per-task results_t as well as the actual outputs from sub-pools
+   * of this.  Allocation will happen just before calling the processing
+   * function.  Release the memory immediately afterwards, unless some actual
+   * output has been produced.
    */
   apr_pool_t *results_pool;
 
@@ -65,6 +101,13 @@ typedef struct root_t
   svn_task__thread_context_constructor_t context_constructor;
   void *context_baton;
 
+  /* If TRUE, end task processing.  In multi-threaded execution, the main
+   * (= output) thread will set this upon error, cancellation or simply
+   * when all work is done.  Worker threads will check for it and terminate
+   * asap.
+   */
+  svn_atomic_t terminate;
+
 } root_t;
 
 /* Sub-structure of svn_task__t containing that task's processing output.
@@ -173,6 +216,13 @@ struct svn_task__t
    */
   svn_task__t *first_ready;
 
+  /* The first immidiate sub-task that has not been processed.  If this is
+   * NULL, they might still be unprocessed tasks deeper down the tree.
+   *
+   * Use this to efficiently find unprocessed tasks high up in the tree.
+   */
+  svn_task__t *first_unprocessed;
+
   /* Task state. */
 
   /* The callbacks to use. Never NULL. */
@@ -248,6 +298,9 @@ static svn_error_t *link_new_task(svn_ta
       parent->first_ready = task;
     }
 
+  if (task->parent->first_unprocessed == NULL)
+    task->parent->first_unprocessed = task;
+
   /* Test invariants for new tasks.
    *
    * We have to do it while still holding task tree mutex; background
@@ -257,6 +310,7 @@ static svn_error_t *link_new_task(svn_ta
   assert(task->last_sub == NULL);
   assert(task->next == NULL);
   assert(task->first_ready == task);
+  assert(task->first_unprocessed == NULL);
   assert(task->callbacks != NULL);
   assert(task->process_pool != NULL);
 
@@ -325,7 +379,8 @@ static svn_error_t *add_task(
   /* Catch construction snafus early in the process. */
   assert(callbacks != NULL);
 
-  SVN_ERR(alloc_task(&new_task, parent->root->task_pool));
+  SVN_MUTEX__WITH_LOCK(parent->root->task_alloc_mutex,
+                       alloc_task(&new_task, parent->root->task_pool));
 
   new_task->root = parent->root;
   new_task->process_baton = process_baton;
@@ -342,7 +397,14 @@ static svn_error_t *add_task(
   new_task->first_ready = new_task;
   new_task->callbacks = callbacks;
 
-  SVN_ERR(link_new_task(new_task));
+  SVN_MUTEX__WITH_LOCK(new_task->root->mutex, link_new_task(new_task));
+
+  /* Wake up all waiting worker threads:  There is work to do.
+   * If there is not enough work for all, some will go back to sleep.
+   *
+   * In single-threaded execution, no signalling is needed. */
+  if (new_task->root->worker_wakeup)
+    SVN_ERR(svn_thread_cond__broadcast(new_task->root->worker_wakeup));
 
   return SVN_NO_ERROR;
 }
@@ -357,7 +419,8 @@ svn_error_t *svn_task__add(
   void *output_baton)
 {
   callbacks_t *callbacks;
-  SVN_ERR(alloc_callbacks(&callbacks, current->root->task_pool));
+  SVN_MUTEX__WITH_LOCK(current->root->task_alloc_mutex,
+                       alloc_callbacks(&callbacks, current->root->task_pool));
 
   callbacks->process_func = process_func;
   callbacks->output_func = output_func;
@@ -438,6 +501,21 @@ static svn_task__t *next_ready(svn_task_
   return task;
 }
 
+/* Utility function that follows the chain of siblings and returns the
+ * first unprocessed task.
+ * 
+ * Returns TASK if TASK is unprocessed.  Returns NULL if all direct sub-
+ * tasks of TASK->PARENT are already being processed or have been completed.
+ */
+static svn_task__t *next_unprocessed(svn_task__t *task)
+{
+  for (; task; task = task->next)
+    if (task->first_ready == task)
+      break;
+
+  return task;
+}
+
 /* Mark TASK as no longer being unprocessed.
  * Call this before starting actual processing of TASK.
  */
@@ -476,11 +554,43 @@ static void unready_task(svn_task__t *ta
 
       parent->first_ready = first_ready;
     }
+
+  /* Update FIRST_PROCESSED as well.  Since this points only from parent to
+   * some immediate sub-task, no bubble-up action is required here. */
+  if (task->parent && task->parent->first_unprocessed == task)
+    task->parent->first_unprocessed = next_unprocessed(task->next);
 }
 
 
 /* Task processing and outputting results */
 
+/* Return TRUE if there are signs that another worker thread is working on
+ * the sub-tree of TASK or its next sibling.  Detection does not need to be
+ * perfect as this is just a hint to the scheduling strategy.  See
+ * set_processed_and_pick() for details.
+ *
+ * This function must be called with TASK->ROOT->MUTEX acquired.
+ */
+static svn_boolean_t is_contented(const svn_task__t *task)
+{
+  /* Assuming TASK has just been processed, the first sub-task should now
+   * be ready for execution.  Having no sub-tasks is also o.k.  If both
+   * pointers differ, some other worker already picked up a sub-task. */
+  if (task->first_sub != task->first_ready)
+    return TRUE;
+
+  /* If this whole sub-tree has been completed, check whether we can
+   * continue with the next sibling.  If that is already being processed,
+   * we would "step on somebody else's toes". */
+  if (   task->first_ready == NULL
+      && task->next
+      && task->next->first_ready == task->next)
+    return TRUE;
+
+  /* No signs of a clash found. */
+  return FALSE;
+}
+
 /* The forground output_processed() function will now consider TASK's
  * processing function to be completed.  Sub-tasks may still be pending. */
 static void set_processed(svn_task__t *task)
@@ -495,6 +605,83 @@ static svn_boolean_t is_processed(const
   return (task->process_pool == NULL);
 }
 
+/* Mark TASK as "processing completed" and pick a *NEXT_TASK to continue
+ * and mark it as "being processed".  If no good candidate has been found,
+ * set *NEXT_TASK to NULL.
+ *
+ * The heuristics in here are crucial for an efficient parallel traversal
+ * of deep, unbalanced and growing trees.  In their sequential form, many
+ * of the processing functions use parent context information to cache data
+ * and speed up processing in general.  Processing tasks in tree order
+ * takes full advantage of this.
+ *
+ * However, having N workers serving the same sub-tree, that advantage gets
+ * cut down propertionally and may turn into pure overhead.  Ideally, we
+ * want workers to be on distant sub-trees where they can process many tasks
+ * before stepping onto a different worker's "turf".  Whenever the latter
+ * happends, we crawl up the tree to find the most distant (=most high up)
+ * unprocessed task in the tree.  If that fails too, return NULL here and
+ * continue with the "canonical" next_task().
+ *
+ * Please note that we are free to execute tasks in *any* order and this
+ * here is just about being efficient.
+ *
+ * This function must be called with TASK->ROOT->MUTEX acquired.
+ */
+static svn_error_t *set_processed_and_pick(svn_task__t **next_task,
+                                           svn_task__t *task)
+{
+  set_processed(task);
+
+  /* Are we still alone in our sub-tree? */
+  if (is_contented(task))
+    {
+      /* Nope.
+       * Maybe there is some untouched sub-tree under one of our parents.
+       * If so, find the one the highest up in the tree.
+       *
+       * Note that we may not find any despite some "cousin" being a good
+       * option.  This is because we only walk up the ancestry line and
+       * check for immediate children there. */
+      while (task->parent && task->parent->first_unprocessed)
+        task = task->parent;
+
+      task = task->first_unprocessed;
+    }
+  else
+    {
+      /* Probably yes.
+       * Just pick the next task, continue at the parent as needed. */
+      while (task->first_ready == NULL && task->parent)
+        task = task->parent;
+
+      task = task->first_ready;
+    }
+
+  /* Atomic operation:  We need to mark the new task as "in process" while
+   * still holding the mutex. */
+  if (task)
+    unready_task(task);
+
+  *next_task = task;
+  return SVN_NO_ERROR;
+}
+
+/* Full memory barrier to make sure we see all data changes made by other
+ * cores up to this point.  ROOT provides any necessary synchronization
+ * objects.
+ */
+static svn_error_t *enforce_sequential_consistency(root_t *root)
+{
+  /* We only need GCC's "asm volatile" construct but that is not portable. */
+
+  /* Mutexes come with full memory barriers as a side-effect.  But because
+   * we use several mutexes for each single task, adding this mutex one,
+   * which is even never contested, adds only moderate overhead. */
+  SVN_MUTEX__WITH_LOCK(root->memory_barrier_mutex, SVN_NO_ERROR);
+  return SVN_NO_ERROR;
+}
+
 /* Process a single TASK within the given THREAD_CONTEXT.  It may add
  * sub-tasks but those need separate calls to this function to be processed.
  *
@@ -567,6 +754,22 @@ static svn_error_t *output_processed(
     {
       svn_pool_clear(iterpool);
 
+      /* When tasks are executed in background threads, we do have a
+       * potential memory ordering issue:  We may see a mix of old and new
+       * state in CURRENT, i.e. the "is processed" state (new) but without
+       * the updates to the other fields and sub-structures.
+       *
+       * The worker threads acquire a mutex before setting the "is processed"
+       * state and those act as full memory barriers, i.e. commit all prior
+       * changes to memory.  On the hardware level, this is enough to make
+       * sure the we read the updated data once we saw the state change.
+       *
+       * The compiler, however, does not know that this is (potentially)
+       * threaded code.  So, VERY technically, it is allowed to reorder
+       * operations and read fields of CURRENT *before* reading the state.
+       * This would no longer be sequentially consistent.  Prevent that. */
+      enforce_sequential_consistency(current->root);
+
       /* Post-order, i.e. dive into sub-tasks first.
        *
        * Note that the post-order refers to the task ordering and the output
@@ -626,7 +829,8 @@ static svn_error_t *output_processed(
                * with the next iteration. */
               svn_task__t *to_delete = current;
               current = to_delete->parent;
-              SVN_ERR(remove_task(to_delete));
+              SVN_MUTEX__WITH_LOCK(to_delete->root->mutex,
+                                   remove_task(to_delete));
 
               /* We have output all sub-nodes, including all partial results.
                * Therefore, the last used thing allocated in OUTPUT->POOL is
@@ -646,6 +850,290 @@ static svn_error_t *output_processed(
 
 /* Execution models */
 
+/* From ROOT, find the first unprocessed task - in pre-order - mark it as
+ * "in process" and return it in *TASK.  If no such task exists, wait for
+ * the ROOT->WORKER_WAKEUP condition and retry.
+ *
+ * If ROOT->TERMINATE is set, return NULL for *TASK.
+ *
+ * If the main thread is waiting on us to process tasks, this logic will
+ * implicitly pick that task.  So, by default, all workers start on those
+ * tasks that are immediately useful for the output processing.  Only later
+ * will contention detection bounce most of them off to other sub-tasks.
+ * However, there are still likely to come back to this once in a while and
+ * ensure that progress is made at the beginning of the tree and the output
+ * is not delayed much.
+ *
+ * This function must be called with ROOT->MUTEX acquired.
+ */
+static svn_error_t *next_task(svn_task__t **task, root_t *root)
+{
+  while (TRUE)
+    {
+      /* Spurious wakeups are being handled implicitly
+       * (check conditions and go back to sleep). */
+
+      /* Worker thread needs to terminate? */
+      if (svn_atomic_read(&root->terminate))
+        {
+          *task = NULL;
+          return SVN_NO_ERROR;
+        }
+
+      /* If there are unprocessed tasks, pick the first one. */
+      if (root->task->first_ready)
+        {
+          svn_task__t *current = root->task->first_ready;
+          unready_task(current);
+          *task = current;
+
+          return SVN_NO_ERROR;
+        }
+      
+      /* No task, no termination.  Wait for one of these to happen. */
+      SVN_ERR(svn_thread_cond__wait(root->worker_wakeup, root->mutex));
+    }
+
+  return SVN_NO_ERROR;
+}
+
+/* Cancellation function to be used within background threads.
+ * BATON is the root_t object.
+ *
+ * This simply checks for cancellation by the forground thread.
+ * Normal cancellation is handled by the output function and then simply
+ * indicated by flag.
+ * 
+ * Note that termination due to errors returned by other tasks will also
+ * be treated as a cancellation with the respective SVN_ERR_CANCELLED
+ * error being returned from the current tasks in all workers.
+ */
+static svn_error_t *worker_cancelled(void *baton)
+{
+  root_t *root = baton;
+  return svn_atomic_read(&root->terminate)
+       ? svn_error_create(SVN_ERR_CANCELLED, NULL, NULL)
+       : SVN_NO_ERROR;
+}
+
+/* Set the TERMINATE flag in ROOT and make sure all worker threads get the
+ * message.  The latter is required to actually terminate all workers once
+ * all tasks have been completed, because workers don't terminate themselves
+ * unless there is some internal error.
+ */
+static svn_error_t *send_terminate(root_t *root)
+{
+  svn_atomic_set(&root->terminate, TRUE);
+  return svn_thread_cond__broadcast(root->worker_wakeup);
+}
+
+/* Background worker processing any task in ROOT until termination has been
+ * signalled in ROOT.  Use SCRATCH_POOL for temporary allocations.
+ */
+static svn_error_t *worker(root_t *root, apr_pool_t *scratch_pool)
+{
+  apr_pool_t *iterpool = svn_pool_create(scratch_pool);
+  svn_task__t *task = NULL;
+
+  /* The context may be quite complex, so we use the ITERPOOL to clean up any
+   * memory that was used temporarily during context creation. */
+  void *thread_context = NULL;
+  if (root->context_constructor)
+    SVN_ERR(root->context_constructor(&thread_context, root->context_baton,
+                                      scratch_pool, iterpool));
+
+  /* Keep processing tasks until termination.
+   * If no tasks need processing, sleep until being signalled
+   * (new task or termination). */
+  while (!svn_atomic_read(&root->terminate))
+    {
+      svn_pool_clear(iterpool);
+      if (!task)
+        {
+          /* We did not pick a suitable task to continue with.
+           *
+           * Make sure the output task is not sleeping (we may have processed
+           * many tasks in a large sub-tree without telling the forground
+           * thread), so it may tell us to continue or terminate. */
+          SVN_ERR(svn_thread_cond__signal(root->task_processed));
+
+          /* Pick the next task in pre-order.  If none exists, sleep until
+           * woken up. */
+          SVN_MUTEX__WITH_LOCK(root->mutex, next_task(&task, root));
+
+          /* None existed, we slept, got woken up and there still was nothing.
+           * This implies termination. */
+          if (!task)
+            break;
+        }
+
+      /* Process this TASK and pick a suitable next one, if available. */
+      process(task, thread_context, worker_cancelled, root, iterpool);
+      SVN_MUTEX__WITH_LOCK(root->mutex,
+                           set_processed_and_pick(&task, task));
+    }
+
+  /* Cleanup. */
+  svn_pool_destroy(iterpool);
+
+  return SVN_NO_ERROR;
+}
+
+/* The plain APR thread around the worker function.
+ * DATA is the root_t object to work on. */
+static void * APR_THREAD_FUNC
+worker_thread(apr_thread_t *thread, void *data)
+{
+  /* Each thread uses a separate single-threaded pool tree for minimum overhead
+   */
+  apr_pool_t *pool = apr_allocator_owner_get(svn_pool_create_allocator(FALSE));
+
+  apr_status_t result = APR_SUCCESS;
+  svn_error_t *err = worker(data, pool);
+  if (err)
+    {
+      result = err->apr_err;
+      svn_error_clear(err);
+    }
+  
+  svn_pool_destroy(pool);
+
+  /* End thread explicitly to prevent APR_INCOMPLETE return codes in
+     apr_thread_join(). */
+  apr_thread_exit(thread, result);
+  return NULL;
+}
+
+/* If TASK has not been processed, yet, wait for it.  Before waiting for
+ * the "task processed" signal, start a new worker thread, allocated in a
+ * THREAD_SAFE_POOL, and add it to the array of THREADS.
+ * 
+ * So, everytime we run out of processing results, we add a new worker.
+ * This results in a slightly delayed spawning of new threads.  The total
+ * number of worker threads is limited to THREAD_COUNT.
+ *
+ * This function must be called with TASK->ROOT->MUTEX acquired.
+ */
+static svn_error_t *wait_for_outputting_state(
+  svn_task__t *task,
+  apr_int32_t thread_count,
+  apr_array_header_t *threads,
+  apr_pool_t *thread_safe_pool)
+{
+  root_t* root = task->root;
+  while (TRUE)
+    {
+      if (is_processed(task))
+        return SVN_NO_ERROR;
+
+      /* Maybe spawn another worker thread because there are waiting tasks.
+       */
+      if (thread_count > threads->nelts)
+        {
+          apr_thread_t *thread;
+          apr_status_t status = apr_thread_create(&thread, NULL,
+                                                  worker_thread,
+                                                  root,
+                                                  thread_safe_pool);
+          if (status)
+            return svn_error_wrap_apr(status,
+                                      "Creating worker thread failed");
+
+          APR_ARRAY_PUSH(threads, apr_thread_t *) = thread;
+        }
+
+      /* Efficiently wait for tasks to (maybe) be completed. */
+      SVN_ERR(svn_thread_cond__wait(root->task_processed, root->mutex));
+    }
+
+  return SVN_NO_ERROR;
+}
+
+/* Run the (root) TASK to completion, including dynamically added sub-tasks.
+ * Use up to THREAD_COUNT worker threads for that.
+ *
+ * Pass CANCEL_FUNC and CANCEL_BATON only into the output function callbacks.
+ * Pass the RESULT_POOL into the task output functions and use SCRATCH_POOL
+ * for everything else (unless covered by task pools).
+ */
+static svn_error_t *execute_concurrently(
+  svn_task__t *task,
+  apr_int32_t thread_count,
+  svn_cancel_func_t cancel_func,
+  void *cancel_baton,
+  apr_pool_t *result_pool,
+  apr_pool_t *scratch_pool)
+{
+  int i;
+  svn_task__t *current = task;
+  root_t *root = task->root;
+  svn_error_t *task_err = SVN_NO_ERROR;
+  svn_error_t *sync_err = SVN_NO_ERROR;
+  apr_pool_t *iterpool = svn_pool_create(scratch_pool);
+  apr_array_header_t *threads = apr_array_make(scratch_pool, thread_count,
+                                               sizeof(apr_thread_t *));
+
+  /* We need a thread-safe-pool to create the actual thread objects. */
+  apr_pool_t *thread_safe_pool = svn_pool_create(root->results_pool);
+
+  /* Main execution loop */ 
+  while (current && !task_err)
+    {
+      svn_pool_clear(iterpool);
+
+      /* Spawns worker thread as needed.
+       *
+       * Acquiring the mutex also acts as a full memory barrier such that we
+       * will see updates to task states.  Since we set_processed() last,
+       * all other information relevant to the task will be valid, too. */
+      SVN_MUTEX__WITH_LOCK(root->mutex,
+                           wait_for_outputting_state(current, thread_count,
+                                                     threads,
+                                                     thread_safe_pool));
+
+      /* Crawl processed tasks and output results until we exhaust processed
+       * tasks. */
+      task_err = output_processed(&current,
+                                  cancel_func, cancel_baton,
+                                  result_pool, iterpool);
+    }
+  
+  /* Tell all worker threads to terminate. */
+  SVN_MUTEX__WITH_LOCK(root->mutex, send_terminate(root));
+
+  /* Wait for all threads to terminate. */
+  for (i = 0; i < threads->nelts; ++i)
+    {
+      apr_thread_t *thread = APR_ARRAY_IDX(threads, i, apr_thread_t *);
+      apr_status_t retval;
+      apr_status_t sync_status = apr_thread_join(&retval, thread);
+      
+      if (retval != APR_SUCCESS)
+        sync_err = svn_error_compose_create(sync_err,
+                        svn_error_wrap_apr(retval,
+                                          "Worker thread returned error"));
+        
+      if (sync_status != APR_SUCCESS)
+        sync_err = svn_error_compose_create(sync_err,
+                        svn_error_wrap_apr(sync_status,
+                                          "Worker thread join error"));
+    }
+  
+  /* Explicitly release any (other) error.  Leave pools as they are.
+   * This is important in the case of early exists due to error returns.
+   *
+   * However, don't do it if something went wrong while waiting for worker
+   * threads to terminate.  They might still be doing something and might
+   * crash at any time.  Doing nothing here might increase our chance for
+   * the SYNC_ERR to eventually be reported to the user. */
+  if (!sync_err)
+    clear_errors(task);
+  svn_pool_destroy(iterpool);
+
+  /* Get rid of all remaining tasks. */
+  return svn_error_trace(svn_error_compose_create(sync_err, task_err));
+}
+
 /* Run the (root) TASK to completion, including dynamically added sub-tasks.
  * Pass CANCEL_FUNC and CANCEL_BATON directly into the task callbacks.
  * Pass the RESULT_POOL into the task output functions and use SCRATCH_POOL
@@ -703,6 +1191,18 @@ static svn_error_t *execute_serially(
 
 /* Root data structure */
 
+/* Pool cleanup function to make sure we free the root pools (allocators) */
+static apr_status_t
+root_cleanup(void *baton)
+{
+  root_t *root = baton;
+  svn_pool_destroy(root->task_pool);
+  svn_pool_destroy(root->process_pool);
+  svn_pool_destroy(root->results_pool);
+
+  return APR_SUCCESS;
+}
+
 svn_error_t *svn_task__run(
   apr_int32_t thread_count,
   svn_task__process_func_t process_func,
@@ -718,15 +1218,54 @@ svn_error_t *svn_task__run(
 {
   root_t *root = apr_pcalloc(scratch_pool, sizeof(*root));
 
+  /* Pick execution model.
+   *
+   * Note that multi-threading comes with significant overheads and should
+   * not be used unless requested. */
+#if APR_HAS_THREADS
+  svn_boolean_t threaded_execution = thread_count > 1;
+#else
+  svn_boolean_t threaded_execution = FALSE;
+#endif
+
   /* Allocation on stack is fine as this function will not exit before
    * all task processing has been completed. */
   callbacks_t callbacks;
 
-  /* For now, we only have single-threaded execution.
-   * So, no special consideration required regarding pools & serialization.*/
-  root->task_pool = scratch_pool;
-  root->process_pool = scratch_pool;
-  root->results_pool = scratch_pool;
+  /* The mutexes must always be constructed.  But we only need their light-
+   * weight versions in single-threaded execution. */
+  SVN_ERR(svn_mutex__init(&root->mutex, threaded_execution, scratch_pool));
+  SVN_ERR(svn_mutex__init(&root->memory_barrier_mutex, threaded_execution,
+                          scratch_pool));
+  SVN_ERR(svn_mutex__init(&root->task_alloc_mutex, threaded_execution,
+                          scratch_pool));
+
+  /* Inter-thread signalling (condition variables) are only needed when
+   * we use worker threads. */
+  if (threaded_execution)
+    {
+      SVN_ERR(svn_thread_cond__create(&root->task_processed, scratch_pool));
+      SVN_ERR(svn_thread_cond__create(&root->worker_wakeup, scratch_pool));
+    }
+
+  /* Permanently allocating a pool for each task is too expensive.
+   * So, we will allocate directly from this pool - which requires
+   * serialization of each apr_alloc() call.  Therefore, we don't need
+   * the added overhead of allocator serialization here. */
+  root->task_pool =
+    apr_allocator_owner_get(svn_pool_create_allocator(FALSE));
+
+  /* The sub-pools of these will used in a single thread each but created
+   * from different worker threads.  Using allocator serialization is good
+   * enough for that. */
+  root->process_pool =
+    apr_allocator_owner_get(svn_pool_create_allocator(threaded_execution));
+  root->results_pool =
+    apr_allocator_owner_get(svn_pool_create_allocator(threaded_execution));
+
+  /* Be sure to clean the root pools up afterwards. */
+  apr_pool_cleanup_register(scratch_pool, root, root_cleanup,
+                            apr_pool_cleanup_null);
 
   callbacks.process_func = process_func;
   callbacks.output_func = output_func;
@@ -741,11 +1280,21 @@ svn_error_t *svn_task__run(
 
   root->context_baton = context_baton;
   root->context_constructor = context_constructor;
+  root->terminate = FALSE;
 
-  /* For now, there is only single-threaded execution. */
-  SVN_ERR(execute_serially(root->task,
-                           cancel_func, cancel_baton,
-                           result_pool, scratch_pool));
+  /* Go, go, go! */
+  if (threaded_execution)
+    {
+      SVN_ERR(execute_concurrently(root->task, thread_count,
+                                   cancel_func, cancel_baton,
+                                   result_pool, scratch_pool));
+    }
+  else
+   {
+     SVN_ERR(execute_serially(root->task,
+                              cancel_func, cancel_baton,
+                              result_pool, scratch_pool));
+   }    
 
   return SVN_NO_ERROR;
 }