You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by kp...@apache.org on 2022/06/13 17:23:01 UTC

[tvm] branch main updated: [Hexagon] Add HexagonThreadManager (#11653)

This is an automated email from the ASF dual-hosted git repository.

kparzysz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git


The following commit(s) were added to refs/heads/main by this push:
     new 76b9ce9b1f [Hexagon] Add HexagonThreadManager (#11653)
76b9ce9b1f is described below

commit 76b9ce9b1f7d2b7e64b4b9c9d456a02b8a010473
Author: Adam Straw <as...@octoml.ai>
AuthorDate: Mon Jun 13 10:22:54 2022 -0700

    [Hexagon] Add HexagonThreadManager (#11653)
    
    * Adding initial threadmanager class
    
    * Fixed compile errors
    
    * Moving constant defines inside class
    
    * Updating qurt includes
    
    * use default scope for hexagon buffers
    
    * Updating buffer allocations
    
    * Fixed bug where array of pointers treated as array of structs
    
    * - Updated HexgonDeviceAPI to use HexagonThreadManager
    - Updated HexagonThreadManager interface to use TVMStreams
    - Added second `Dispatch` interfce in thread manager to use PackedFuncs
    - Updated thread manager to use vector for dynamic semaphore allocation
    - Added "#if defined(__hexagon__)" in several places to prevent compilation errors
    
    * Bug fixes + interface addition + basic thread tests
     - Fixed GetStreams not returning the streams properly
     - Added missing semaphore cleanup to prevent qurt kernel resource leakage
     - new interface functions:
       - Start() : now all worker threads are blocked on initialization until ThreadManager->Start() is called
       - WaitOnThreads() : blocking call which waits until all worker thread queues are empty
     - added extra debug logging
     - Two new basic thread tests working
    
    * Adding initial ThreadManager tests
    
    * HexagonThreadManager tests and refactor
    
    * remove stack / pipe size member vars
    
    * init pointers in the header file
    
    * move all mem allocs to SpawnThreads
    
    * start_semaphore as object instead of pointer
    
    * fix bug with WaitOnThreads deadlock + Wait/Signal off by one error
    
    * add / refactor Signal / Wait tests
    
    * add SyncFromTo test cases
    
    * add Dispatch test cases
    
    * add pipe fill and overflow cases
    
    * Updating dispatch to return bool and fix pipe overflow problem
    
    * change around min / max values for stack / pipe
    
    * integrate pipe fill / overflow tests back into HTM test suite
    
    * use HexagonBuffer
    
    * assert if stack / pipe sizes fall below min
    
    * Changed semaphore vector to store pointers, not structs (fixes vector capacity adjustment invaliding in-use addresses).
    
    * add producer consumer, thread order test cases
    
    * change to unordered_map for semaphores and remove PreallocateSyncs
    
    * tests running on device
    
    * code cleanup for compile warnings
    
    * remove #if defined(__hexagon__) guards
    
    * copyright, format, lint
    
    * add hexagon buffer map class
    
    * remove redundant thread manager tests
    
    * revert Hex Dev API changes for threading
    
    * add comments; remove untested code to dispatch / wrap a packed func
    
    * pass pipe address and not HTM pointer to thread context
    
    * rename to HexagonBufferManager
    
    * cleanup ahead of PR
    
    * use DLOG(INFO)
    
    * refactor GetStreamHandles to return a vector by value
    
    * adjust HexagonBufferManager methods; use thread_manager file names
    
    * style guidelines and debug prints
    
    * reinterpret cast for TVMStreamHandle
    
    * end member variables with underscore
    
    Co-authored-by: Joseph McMahan <jm...@octoml.ai>
---
 src/runtime/hexagon/hexagon_buffer_manager.h       |  81 ++++++
 src/runtime/hexagon/hexagon_device_api.cc          |  29 +-
 src/runtime/hexagon/hexagon_device_api.h           |  23 +-
 src/runtime/hexagon/hexagon_thread_manager.cc      | 291 ++++++++++++++++++
 src/runtime/hexagon/hexagon_thread_manager.h       | 194 ++++++++++++
 .../hexagon/hexagon_thread_manager_tests.cc        | 324 +++++++++++++++++++++
 6 files changed, 901 insertions(+), 41 deletions(-)

diff --git a/src/runtime/hexagon/hexagon_buffer_manager.h b/src/runtime/hexagon/hexagon_buffer_manager.h
new file mode 100644
index 0000000000..658a39fac8
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_buffer_manager.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
+#define TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
+
+#include <tvm/runtime/logging.h>
+
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "hexagon_buffer.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+class HexagonBufferManager {
+ public:
+  /*!
+   * \brief Free a HexagonBuffer.
+   * \param ptr Address of the HexagonBuffer as returned by `AllocateHexagonBuffer`.
+   */
+  void FreeHexagonBuffer(void* ptr) {
+    auto it = hexagon_buffer_map_.find(ptr);
+    CHECK(it != hexagon_buffer_map_.end())
+        << "Attempt made to free unknown or already freed dataspace allocation";
+    CHECK(it->second != nullptr);
+    hexagon_buffer_map_.erase(it);
+  }
+  /*!
+   * \brief Allocate a HexagonBuffer.
+   * \param args Templated arguments to pass through to HexagonBuffer constructor.
+   */
+  template <typename... Args>
+  void* AllocateHexagonBuffer(Args&&... args) {
+    auto buf = std::make_unique<HexagonBuffer>(std::forward<Args>(args)...);
+    void* ptr = buf->GetPointer();
+    hexagon_buffer_map_.insert({ptr, std::move(buf)});
+    return ptr;
+  }
+
+  //! \brief Returns whether the HexagonBuffer is in the map.
+  size_t count(void* ptr) { return hexagon_buffer_map_.count(ptr); }
+
+  //! \brief Returns an iterator to the HexagonBuffer within the map.
+  HexagonBuffer* find(void* ptr) {
+    auto it = hexagon_buffer_map_.find(ptr);
+    if (it != hexagon_buffer_map_.end()) {
+      return it->second.get();
+    }
+    return nullptr;
+  }
+
+ private:
+  //! \brief Contains the HexagonBuffer objects managed by this class.
+  std::unordered_map<void*, std::unique_ptr<HexagonBuffer>> hexagon_buffer_map_;
+};
+
+}  // namespace hexagon
+}  // namespace runtime
+}  // namespace tvm
+
+#endif  // TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
diff --git a/src/runtime/hexagon/hexagon_device_api.cc b/src/runtime/hexagon/hexagon_device_api.cc
index c9c1586008..92a7b22784 100644
--- a/src/runtime/hexagon/hexagon_device_api.cc
+++ b/src/runtime/hexagon/hexagon_device_api.cc
@@ -32,7 +32,6 @@
 #include <cstring>
 
 #include "../workspace_pool.h"
-#include "hexagon_buffer.h"
 #include "hexagon_common.h"
 
 namespace tvm {
@@ -74,14 +73,14 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, int ndim, const int64_t* shap
   }
 
   if (ndim == 0) {
-    return AllocateHexagonBuffer(typesize, alignment, mem_scope);
+    return hexbuffs.AllocateHexagonBuffer(typesize, alignment, mem_scope);
   } else if (ndim == 1) {
     size_t nbytes = shape[0] * typesize;
-    return AllocateHexagonBuffer(nbytes, alignment, mem_scope);
+    return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, mem_scope);
   } else if (ndim == 2) {
     size_t nallocs = shape[0];
     size_t nbytes = shape[1] * typesize;
-    return AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope);
+    return hexbuffs.AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope);
   } else {
     LOG(FATAL) << "Hexagon Device API supports only 1d and 2d allocations, but received ndim = "
                << ndim;
@@ -97,13 +96,13 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, size_t nbytes, size_t alignme
   if (alignment < kHexagonAllocAlignment) {
     alignment = kHexagonAllocAlignment;
   }
-  return AllocateHexagonBuffer(nbytes, alignment, String("global"));
+  return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, String("global"));
 }
 
 void HexagonDeviceAPI::FreeDataSpace(Device dev, void* ptr) {
   CHECK(ptr) << "buffer pointer is null";
   CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type;
-  FreeHexagonBuffer(ptr);
+  hexbuffs.FreeHexagonBuffer(ptr);
 }
 
 // WorkSpace: runtime allocations for Hexagon
@@ -119,7 +118,7 @@ void* HexagonDeviceAPI::AllocWorkspace(Device dev, size_t size, DLDataType type_
 
 void HexagonDeviceAPI::FreeWorkspace(Device dev, void* data) {
   CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type;
-  CHECK(hexagon_buffer_map_.count(data) != 0)
+  CHECK(hexbuffs.count(data) != 0)
       << "Attempt made to free unknown or already freed workspace allocation";
   dmlc::ThreadLocalStore<HexagonWorkspacePool>::Get()->FreeWorkspace(dev, data);
 }
@@ -143,13 +142,7 @@ void HexagonDeviceAPI::CopyDataFromTo(DLTensor* from, DLTensor* to, TVMStreamHan
   CHECK_EQ(to->byte_offset, 0);
   CHECK_EQ(GetDataSize(*from), GetDataSize(*to));
 
-  auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* {
-    auto it = this->hexagon_buffer_map_.find(ptr);
-    if (it != this->hexagon_buffer_map_.end()) {
-      return it->second.get();
-    }
-    return nullptr;
-  };
+  auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* { return hexbuffs.find(ptr); };
 
   HexagonBuffer* hex_from_buf = lookup_hexagon_buffer(from->data);
   HexagonBuffer* hex_to_buf = lookup_hexagon_buffer(to->data);
@@ -172,14 +165,6 @@ void HexagonDeviceAPI::CopyDataFromTo(const void* from, size_t from_offset, void
   memcpy(static_cast<char*>(to) + to_offset, static_cast<const char*>(from) + from_offset, size);
 }
 
-void HexagonDeviceAPI::FreeHexagonBuffer(void* ptr) {
-  auto it = hexagon_buffer_map_.find(ptr);
-  CHECK(it != hexagon_buffer_map_.end())
-      << "Attempt made to free unknown or already freed dataspace allocation";
-  CHECK(it->second != nullptr);
-  hexagon_buffer_map_.erase(it);
-}
-
 TVM_REGISTER_GLOBAL("device_api.hexagon.mem_copy").set_body([](TVMArgs args, TVMRetValue* rv) {
   void* dst = args[0];
   void* src = args[1];
diff --git a/src/runtime/hexagon/hexagon_device_api.h b/src/runtime/hexagon/hexagon_device_api.h
index 6f65bf4027..4da12e35fb 100644
--- a/src/runtime/hexagon/hexagon_device_api.h
+++ b/src/runtime/hexagon/hexagon_device_api.h
@@ -30,6 +30,7 @@
 #include <vector>
 
 #include "hexagon_buffer.h"
+#include "hexagon_buffer_manager.h"
 
 namespace tvm {
 namespace runtime {
@@ -72,7 +73,7 @@ class HexagonDeviceAPI final : public DeviceAPI {
    */
   void* AllocWorkspace(Device dev, size_t size, DLDataType type_hint) final;
 
-  //! Erase from tracked hexagon_buffer_map and free
+  //! Erase from HexagonBufferManager and free
   void FreeWorkspace(Device dev, void* data) final;
 
   /*!
@@ -127,18 +128,6 @@ class HexagonDeviceAPI final : public DeviceAPI {
                       TVMStreamHandle stream) final;
 
  private:
-  /*! \brief Helper to allocate a HexagonBuffer and register the result
-   *  in the owned buffer map.
-   *  \return Raw data storage managed by the hexagon buffer
-   */
-  template <typename... Args>
-  void* AllocateHexagonBuffer(Args&&... args) {
-    auto buf = std::make_unique<HexagonBuffer>(std::forward<Args>(args)...);
-    void* ptr = buf->GetPointer();
-    hexagon_buffer_map_.insert({ptr, std::move(buf)});
-    return ptr;
-  }
-
   /*! \brief Helper to check if the device type is valid for the Hexagon Device API
    *  \return Boolean indicating whether the device type is valid
    */
@@ -148,12 +137,8 @@ class HexagonDeviceAPI final : public DeviceAPI {
            (DLDeviceType(dev.device_type) == kDLCPU);
   }
 
-  /*! \brief Helper to free a HexagonBuffer and unregister the result
-   *  from the owned buffer map.
-   */
-  void FreeHexagonBuffer(void* ptr);
-  //! Lookup table for the HexagonBuffer managing an allocation.
-  std::unordered_map<void*, std::unique_ptr<HexagonBuffer>> hexagon_buffer_map_;
+  //! \brief Manages underlying HexagonBuffer allocations
+  HexagonBufferManager hexbuffs;
 };
 }  // namespace hexagon
 }  // namespace runtime
diff --git a/src/runtime/hexagon/hexagon_thread_manager.cc b/src/runtime/hexagon/hexagon_thread_manager.cc
new file mode 100644
index 0000000000..5d67b142e5
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_thread_manager.cc
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "hexagon_thread_manager.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+HexagonThreadManager::HexagonThreadManager(unsigned num_threads, unsigned thread_stack_size_bytes,
+                                           unsigned thread_pipe_size_words) {
+  // Note: could technically manage more software threads than allowable hardware threads, but there
+  // is no system constant defined
+  //  in the qurt libs for that maximum.
+  CHECK(num_threads);
+  CHECK_LE(num_threads, QURT_MAX_HTHREAD_LIMIT);
+  nthreads_ = num_threads;
+
+  CHECK_GE(thread_stack_size_bytes, MIN_STACK_SIZE_BYTES);
+  CHECK_LE(thread_stack_size_bytes, MAX_STACK_SIZE_BYTES);
+
+  CHECK_GE(thread_pipe_size_words, MIN_PIPE_SIZE_WORDS);
+  CHECK_LE(thread_pipe_size_words, MAX_PIPE_SIZE_WORDS);
+
+  DLOG(INFO) << "Spawning threads";
+  SpawnThreads(thread_stack_size_bytes, thread_pipe_size_words);
+
+  // Initially, block all threads until we get the Start() call
+  qurt_sem_init_val(&start_semaphore_, 0);
+  for (unsigned i = 0; i < nthreads_; i++) {
+    Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_wait, &start_semaphore_);
+  }
+}
+
+HexagonThreadManager::~HexagonThreadManager() {
+  // In case Start() was never explicitly called, call it now to prevent deadlock
+  if (qurt_sem_get_val(&start_semaphore_) == 0) {
+    Start();
+  }
+
+  DLOG(INFO) << "Threads started";
+
+  // dispatch a command to each thread to exit with status 0
+  for (unsigned i = 0; i < nthreads_; i++) {
+    bool success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_exit, nullptr);
+    while (!success) {
+      success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_exit, nullptr);
+    }
+  }
+
+  DLOG(INFO) << "Threads exited";
+
+  // join with each thread (wait for them to terminate); if already exited, the call returns
+  // immediately
+  int status;  // don't actually care what the thread exit status was
+  for (unsigned i = 0; i < nthreads_; i++) {
+    qurt_thread_join(threads_[i], &status);
+  }
+
+  DLOG(INFO) << "Threads joined";
+
+  // Destroy semaphores
+  qurt_sem_destroy(&start_semaphore_);
+  for (auto it : semaphores_) {
+    qurt_sem_destroy(it.second);
+    free(it.second);
+  }
+
+  DLOG(INFO) << "Semaphores destroyed";
+
+  // Delete pipe objects and contexts
+  for (unsigned i = 0; i < nthreads_; i++) {
+    qurt_pipe_destroy(&pipes_[i]);
+    delete contexts_[i];
+  }
+
+  DLOG(INFO) << "Pipes and contexts deleted";
+
+  // Dealloc memory blocks
+  hexbuffs_.FreeHexagonBuffer(stack_buffer_);
+  hexbuffs_.FreeHexagonBuffer(pipe_buffer_);
+
+  DLOG(INFO) << "Buffers freed";
+}
+
+void HexagonThreadManager::SpawnThreads(unsigned thread_stack_size_bytes,
+                                        unsigned thread_pipe_size_words) {
+  // allocate all stack space for threads
+  stack_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_stack_size_bytes * nthreads_,
+                                                  MEM_ALIGNMENT, String("global"));
+  // allocate space for pipe buffers (command queues)
+  unsigned thread_pipe_size_bytes = thread_pipe_size_words * sizeof(qurt_pipe_data_t);
+  pipe_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_pipe_size_bytes * nthreads_, MEM_ALIGNMENT,
+                                                 String("global"));
+
+  threads_.resize(nthreads_);
+  pipes_.resize(nthreads_);
+  contexts_.resize(nthreads_);
+
+  DLOG(INFO) << "Buffers allocated";
+
+  // First, create pipe resources for all threads
+  char* next_pipe_start = reinterpret_cast<char*>(pipe_buffer_);
+  for (unsigned i = 0; i < nthreads_; i++) {
+    qurt_pipe_attr_t pipe_attr;
+    qurt_pipe_attr_init(&pipe_attr);
+    qurt_pipe_attr_set_buffer(&pipe_attr, reinterpret_cast<qurt_pipe_data_t*>(next_pipe_start));
+    next_pipe_start += thread_pipe_size_bytes;
+    qurt_pipe_attr_set_buffer_partition(&pipe_attr, QURT_PIPE_ATTR_MEM_PARTITION_RAM);
+    qurt_pipe_attr_set_elements(&pipe_attr, thread_pipe_size_words);
+
+    // create the pipe
+    int rc = qurt_pipe_init(&pipes_[i], &pipe_attr);
+    CHECK_EQ(rc, QURT_EOK);
+  }
+
+  DLOG(INFO) << "Pipes created";
+
+  // Create all threads
+  char* next_stack_start = reinterpret_cast<char*>(stack_buffer_);
+  for (unsigned i = 0; i < nthreads_; i++) {
+    // create initialize the thread attr
+    qurt_thread_attr_t thread_attr;
+    char name[32];
+    qurt_thread_attr_init(&thread_attr);
+    qurt_thread_attr_set_stack_addr(&thread_attr, next_stack_start);
+    qurt_thread_attr_set_stack_size(&thread_attr, thread_stack_size_bytes);
+    snprintf(name, sizeof(name), "thread %d", i);
+    qurt_thread_attr_set_name(&thread_attr, name);
+    next_stack_start += thread_stack_size_bytes;
+
+    // create the thread
+    contexts_[i] = new ThreadContext(&pipes_[i], i);
+    int rc = qurt_thread_create(&threads_[i], &thread_attr, thread_main, contexts_[i]);
+    CHECK_EQ(rc, QURT_EOK);
+  }
+
+  DLOG(INFO) << "Threads created";
+}
+
+const std::vector<TVMStreamHandle> HexagonThreadManager::GetStreamHandles() {
+  std::vector<TVMStreamHandle> out;
+  for (unsigned i = 0; i < nthreads_; i++) {
+    // threads identified by index into `threads` array
+    out.push_back(reinterpret_cast<TVMStreamHandle>(i));
+  }
+  return out;
+}
+
+bool HexagonThreadManager::Dispatch(TVMStreamHandle stream, voidfunc f, void* args) {
+  unsigned thread = reinterpret_cast<unsigned>(stream);
+  DLOG(INFO) << "Dispatching to stream " << thread;
+  Command* cmd = new Command(f, args);  // Command object freed by receiving thread
+  qurt_pipe_data_t msg = (qurt_pipe_data_t)(cmd);
+  qurt_pipe_t* pipeAddr = &pipes_[thread];
+
+  int trysend = qurt_pipe_try_send(pipeAddr, msg);
+  return trysend == 0;
+}
+
+void HexagonThreadManager::Start() { thread_signal(&start_semaphore_); }
+
+void HexagonThreadManager::WaitOnThreads() {
+  // Using standard signal mechanism to block the "main" thread on all worker threads.
+  // Note: this would be slightly more efficient as a barrier, but would need some extra code to
+  //  wait on the barrier that would only be used once.
+
+  // In case Start() was never explicitly called, call it now to prevent deadlock
+  if (qurt_sem_get_val(&start_semaphore_) == 0) {
+    Start();
+  }
+
+  std::vector<qurt_sem_t> finished;
+  finished.resize(nthreads_);
+
+  // initialize one semaphore for each thread
+  for (unsigned i = 0; i < nthreads_; i++) {
+    qurt_sem_init_val(&finished[i], 0);
+  }
+  // dispatch signal() command to each thread on their private semaphore
+  for (unsigned i = 0; i < nthreads_; i++) {
+    bool success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_signal, &finished[i]);
+    while (!success) {
+      success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_signal, &finished[i]);
+    }
+  }
+  // wait on each semaphore, one at a time
+  for (unsigned i = 0; i < nthreads_; i++) {
+    thread_wait(&finished[i]);
+  }
+
+  // clean up
+  for (unsigned i = 0; i < nthreads_; i++) {
+    qurt_sem_destroy(&finished[i]);
+  }
+}
+
+void HexagonThreadManager::CheckSemaphore(unsigned syncID) {
+  if (semaphores_.find(syncID) == semaphores_.end()) {
+    semaphores_[syncID] = reinterpret_cast<qurt_sem_t*>(malloc(sizeof(qurt_sem_t)));
+    qurt_sem_init_val(semaphores_[syncID], 0);
+  }
+}
+
+bool HexagonThreadManager::Signal(TVMStreamHandle thread, SyncPoint syncID) {
+  CheckSemaphore(syncID);
+  DLOG(INFO) << "Dispatching signal to thread " << thread << " on semaphore ID " << syncID
+             << " located @ 0x" << std::hex << semaphores_[syncID];
+  return Dispatch(thread, thread_signal, semaphores_[syncID]);
+}
+
+bool HexagonThreadManager::Wait(TVMStreamHandle thread, SyncPoint syncID) {
+  CheckSemaphore(syncID);
+  DLOG(INFO) << "Dispatching wait to thread " << thread << " on semaphore ID " << syncID
+             << " located @ 0x" << std::hex << semaphores_[syncID];
+  return Dispatch(thread, thread_wait, semaphores_[syncID]);
+}
+
+/* Create a sync_from_to relationship with a dynamic semaphore allocation.
+Makes use of thread_wait_free to also free the semaphore after sync is complete.
+*/
+bool HexagonThreadManager::SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread) {
+  qurt_sem_t* sem = reinterpret_cast<qurt_sem_t*>(malloc(sizeof(qurt_sem_t)));
+  qurt_sem_init_val(sem, 0);
+  if (Dispatch(signal_thread, thread_signal, sem)) {
+    return Dispatch(wait_thread, thread_wait_free, sem);
+  } else {
+    return false;
+  }
+}
+
+void HexagonThreadManager::thread_signal(void* semaphore) {
+  DLOG(INFO) << "Signaling semaphore addr 0x" << std::hex << semaphore;
+  qurt_sem_add(reinterpret_cast<qurt_sem_t*>(semaphore), QURT_MAX_HTHREAD_LIMIT);
+}
+
+void HexagonThreadManager::thread_wait(void* semaphore) {
+  DLOG(INFO) << "Waiting on semaphore addr 0x" << std::hex << semaphore;
+  qurt_sem_down(reinterpret_cast<qurt_sem_t*>(semaphore));
+}
+
+/* Wait on the passed semaphore object, then free it. */
+void HexagonThreadManager::thread_wait_free(void* semaphore) {
+  qurt_sem_down(reinterpret_cast<qurt_sem_t*>(semaphore));  // blocks until signal is complete
+  qurt_sem_destroy(reinterpret_cast<qurt_sem_t*>(semaphore));
+  free(semaphore);
+}
+
+void HexagonThreadManager::thread_exit(void* status) {
+  DLOG(INFO) << "thread exiting";
+  qurt_thread_exit((uint64_t)status);
+}
+
+void HexagonThreadManager::thread_main(void* context) {
+  ThreadContext* tc = static_cast<ThreadContext*>(context);
+  unsigned index = tc->index;
+  qurt_pipe_t* mypipe = tc->pipe;
+
+  DLOG(INFO) << "Thread " << index << " spawned";
+
+  while (true) {  // loop, executing commands from pipe
+    DLOG(INFO) << "Thread " << index << " receiving command";
+    qurt_pipe_data_t msg = qurt_pipe_receive(mypipe);  // blocks if empty
+    Command* cmd = reinterpret_cast<Command*>(msg);
+    voidfunc f = cmd->f;
+    void* args = cmd->args;
+    delete cmd;
+    f(args);
+  }
+  // thread exit is handled by dispatching an exit command
+}
+
+}  // namespace hexagon
+}  // namespace runtime
+}  // namespace tvm
diff --git a/src/runtime/hexagon/hexagon_thread_manager.h b/src/runtime/hexagon/hexagon_thread_manager.h
new file mode 100644
index 0000000000..3422fef387
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_thread_manager.h
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
+#define TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
+
+#include <tvm/runtime/c_runtime_api.h>
+#include <tvm/runtime/logging.h>
+#include <tvm/runtime/packed_func.h>
+
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "hexagon_buffer.h"
+#include "hexagon_buffer_manager.h"
+#include "hexagon_common.h"
+#include "qurt.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+class HexagonThreadManager {
+  //! \brief Void function.
+  using voidfunc = void (*)(void*);
+  //! \brief Semaphore ID.
+  using SyncPoint = unsigned;
+  //! \brief Alignment of underlying memory allocations.
+  const unsigned MEM_ALIGNMENT = 32;
+  //! \brief Minimum stack size in bytes per thread.
+  const unsigned MIN_STACK_SIZE_BYTES = 0x400;  // 1KB
+  //! \brief Maximum stack size in bytes per thread.
+  const unsigned MAX_STACK_SIZE_BYTES = 0x10000;  // 64KB
+  //! \brief Minimum pipe (or command buffer) size in words (or commands) per thread.
+  const unsigned MIN_PIPE_SIZE_WORDS = 10;
+  //! \brief Maximum pipe (or command buffer) size in words (or commands) per thread.
+  const unsigned MAX_PIPE_SIZE_WORDS = 0x10000;  // 64K words
+
+ public:
+  /*!
+   * \brief Spawn a number of Hexagon threads with a given stack (in bytes) and pipe (a.k.a. command
+   * buffer; in words or commands) within the min and max values specified above.
+   * \param num_threads Number of threads to spawn.
+   * \param thread_stack_size_bytes Stack size in bytes per thread.
+   * \param thread_pipe_size_words Pipe (or command buffer) size in words (or commands).
+   */
+  HexagonThreadManager(unsigned, unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words);
+
+  //! \brief Destructor
+  ~HexagonThreadManager();
+
+  /*!
+   * \brief Get the spawned threads as stream handles.
+   * \returns Vector of stream handles.
+   */
+  const std::vector<TVMStreamHandle> GetStreamHandles();
+
+  /*!
+   * \brief Non-blocking dispatch of a void function and args on a given thread.
+   * \param thread Stream handle of the thread on which to dispatch the void function.
+   * \param f Void function to be dispatched.
+   * \param args Arguments to pass to the void function.
+   * \returns Boolean value indicating success or failure of the dispatch; user must either 1)
+   * `Start` threads executing to clear space in the pipe before retrying dispatch or 2) create a
+   * `HexagonThreadManager` with a larger pipe.
+   */
+  bool Dispatch(TVMStreamHandle thread, voidfunc f, void* args);
+  /*!
+   * \brief Non-blocking signal of a semaphore with a given ID.
+   * \param thread Stream handle of the thread which will signal the semaphore.
+   * \param syncID ID of the semaphore to be signaled.
+   * \returns Boolean value indicating success or failure of the dispatch of the signal; user must
+   * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2)
+   * create a `HexagonThreadManager` with a larger pipe.
+   */
+  bool Signal(TVMStreamHandle thread, SyncPoint syncID);
+  /*!
+   * \brief Non-blocking wait on a semaphore with a given ID.
+   * \param thread Stream handle of the thread which will wait on the semaphore.
+   * \param syncID ID of the semaphore on which to wait.
+   * \returns Boolean value indicating success or failure of the dispatch of the wait; user must
+   * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2)
+   * create a `HexagonThreadManager` with a larger pipe.
+   */
+  bool Wait(TVMStreamHandle thread, SyncPoint syncID);
+  /*!
+   * \brief Creates a synchronization point between two threads by creating a semaphore,
+   *dispatching the `signal_thread` to signal that semaphore and dispatching the `wait_thread to
+   *wait on that semaphore.
+   * \param signal_thread Stream handle for the thread which will signal the
+   *semaphore.
+   * \param wait_thread Stream handle for the thread which will wait on the semaphore.
+   * \returns Boolean value indicating success or failure of the combined dispatch of both the
+   *signal and the wait; user must either 1) `Start` threads executing to clear space in the pipe
+   *before retrying dispatch or 2) create a `HexagonThreadManager` with a larger pipe.
+   */
+  bool SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread);
+  //! \brief Unblock threads to start execution.
+  void Start();
+  //! \brief Unblock threads to start execution if `Start` has not already been called; blocking
+  //! call to wait until all threads have empty pipes.
+  void WaitOnThreads();
+
+ private:
+  struct ThreadContext {
+    qurt_pipe_t* pipe;
+    unsigned index;
+    ThreadContext(qurt_pipe_t* pipe, unsigned index) : pipe(pipe), index(index) {}
+  };
+
+  //! \brief Helper function for the constructor to spawn threads.
+  void SpawnThreads(unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words);
+
+  //! \brief Helper function for `Signal` and `Wait` to create, initialize and map semaphores by ID.
+  void CheckSemaphore(unsigned syncID);
+
+  //! \brief Void function executed by a thread to signal a semaphore.
+  static void thread_signal(void* semaphore);
+
+  //! \brief Void function executed by a thread to wait on a semaphore; used by `Wait`.
+  static void thread_wait(void* semaphore);
+
+  //! \brief Void function executed by a thread to wait on and free a semaphore; used by
+  //! `SyncFromTo`.
+  static void thread_wait_free(void* semaphore);
+
+  //! \brief Void function executed by a thread to exit at time of destruction.
+  static void thread_exit(void* status);
+
+  //! \brief Void function executed by each thread as `main`.
+  static void thread_main(void* context);
+
+  //! \brief Manages underlying HexagonBuffer allocations.
+  HexagonBufferManager hexbuffs_;
+
+  //! \brief Number of threads allocatted.
+  unsigned nthreads_{0};
+
+  //! \brief Pointer to the base of the stacks allocated for all threads; size = `nthreads` *
+  //! `thread_stack_size_bytes`.
+  void* stack_buffer_{nullptr};
+
+  //! \brief Pointer to the base of the pipes (or command buffers) allocated for all threads; size =
+  //! `nthreads` * `thread_pipe_size_words` * sizeof(word).
+  void* pipe_buffer_{nullptr};
+
+  //! \brief QURT thread structure for each spawned thread.
+  std::vector<qurt_thread_t> threads_;
+
+  //! \brief QURT pipe (or command buffer) structure for each spawned thread.
+  std::vector<qurt_pipe_t> pipes_;
+
+  //! \brief Thread context passed into each `thread_main` function.
+  std::vector<ThreadContext*> contexts_;
+
+  //! \brief Semaphores used by `Signal` and `Wait` mapped by ID.
+  std::unordered_map<unsigned, qurt_sem_t*> semaphores_;
+
+  //! \brief Start semaphore created at time of construction; signled by `Start`.
+  qurt_sem_t start_semaphore_;
+
+  /*!
+   *\brief Encapsulate a void function pointer + arg pointer; sent via pipe to threads to execute.
+   */
+  struct Command {
+    voidfunc f;
+    void* args;
+    Command(voidfunc f, void* args) : f(f), args(args) {}
+  };
+};
+
+}  // namespace hexagon
+}  // namespace runtime
+}  // namespace tvm
+
+#endif  // TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
diff --git a/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc
new file mode 100644
index 0000000000..aa86e4638d
--- /dev/null
+++ b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <tvm/runtime/logging.h>
+
+#include "../src/runtime/hexagon/hexagon_thread_manager.h"
+
+using namespace tvm::runtime;
+using namespace tvm::runtime::hexagon;
+
+class HexagonThreadManagerTest : public ::testing::Test {
+ protected:
+  void SetUp() override {
+    htm = new HexagonThreadManager(threads, stack_size, pipe_size);
+    streams = htm->GetStreamHandles();
+  }
+  void TearDown() override { delete htm; }
+  HexagonThreadManager* htm{nullptr};
+  std::vector<TVMStreamHandle> streams;
+  int answer{0};
+  const unsigned threads{6};
+  const unsigned pipe_size{100};
+  const unsigned stack_size{0x4000};  // 16KB
+};
+
+TEST_F(HexagonThreadManagerTest, ctor_errors) {
+  // zero threads
+  ASSERT_THROW(HexagonThreadManager(0, stack_size, pipe_size), InternalError);
+  // too many threads
+  ASSERT_THROW(HexagonThreadManager(0x10000000, stack_size, pipe_size), InternalError);
+  // stack too small
+  ASSERT_THROW(HexagonThreadManager(6, 0, pipe_size), InternalError);
+  // stack too big
+  ASSERT_THROW(HexagonThreadManager(6, 0x10000000, pipe_size), InternalError);
+  // pipe too small
+  ASSERT_THROW(HexagonThreadManager(6, stack_size, 9), InternalError);
+  // pipe too big
+  ASSERT_THROW(HexagonThreadManager(6, stack_size, 0x10000000), InternalError);
+}
+
+TEST_F(HexagonThreadManagerTest, init) {
+  CHECK(htm != nullptr);
+  CHECK_EQ(streams.size(), threads);
+}
+
+void get_the_answer(void* answer) { *reinterpret_cast<int*>(answer) = 42; }
+
+TEST_F(HexagonThreadManagerTest, dispatch) {
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->Start();
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, dispatch_wait) {
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, wait_signal) {
+  htm->Wait(streams[0], 0);
+  htm->Signal(streams[1], 0);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, re_signal) {
+  htm->Wait(streams[0], 0);
+  htm->Signal(streams[1], 0);
+  htm->Signal(streams[1], 0);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, re_wait) {
+  htm->Wait(streams[0], 0);
+  htm->Signal(streams[1], 0);
+  htm->Wait(streams[0], 0);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, wait_signal_x2) {
+  htm->Wait(streams[0], 0);
+  htm->Signal(streams[1], 0);
+  htm->Wait(streams[0], 1);
+  htm->Signal(streams[1], 1);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, signal_wait) {
+  htm->Signal(streams[1], 0);
+  htm->Wait(streams[0], 0);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to) {
+  htm->SyncFromTo(streams[1], streams[0]);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_self) {
+  htm->SyncFromTo(streams[0], streams[0]);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_x2) {
+  htm->SyncFromTo(streams[0], streams[1]);
+  htm->SyncFromTo(streams[1], streams[0]);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_all) {
+  htm->SyncFromTo(streams[5], streams[4]);
+  htm->SyncFromTo(streams[4], streams[3]);
+  htm->SyncFromTo(streams[3], streams[2]);
+  htm->SyncFromTo(streams[2], streams[1]);
+  htm->SyncFromTo(streams[1], streams[0]);
+  htm->Dispatch(streams[0], get_the_answer, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, pipe_fill) {
+  // fill the pipe
+  for (int i = 0; i < pipe_size; ++i) {
+    htm->Dispatch(streams[0], get_the_answer, &answer);
+  }
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, pipe_overflow) {
+  // fill the pipe
+  for (int i = 0; i < pipe_size; ++i) {
+    htm->Dispatch(streams[0], get_the_answer, &answer);
+  }
+  // overflow the pipe
+  bool space = htm->Dispatch(streams[0], get_the_answer, &answer);
+  CHECK_EQ(space, false);
+}
+
+void increment(void* voidptr) {
+  int* intptr = reinterpret_cast<int*>(voidptr);
+  *intptr = *intptr + 1;
+}
+
+TEST_F(HexagonThreadManagerTest, producer_consumer) {
+  htm->Dispatch(streams[5], increment, &answer);
+  htm->SyncFromTo(streams[5], streams[4]);
+  htm->Dispatch(streams[4], increment, &answer);
+  htm->SyncFromTo(streams[4], streams[3]);
+  htm->Dispatch(streams[3], increment, &answer);
+  htm->SyncFromTo(streams[3], streams[2]);
+  htm->Dispatch(streams[2], increment, &answer);
+  htm->SyncFromTo(streams[2], streams[1]);
+  htm->Dispatch(streams[1], increment, &answer);
+  htm->SyncFromTo(streams[1], streams[0]);
+  htm->Dispatch(streams[0], increment, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 6);
+}
+
+TEST_F(HexagonThreadManagerTest, producer_consumer_signal_wait) {
+  htm->Wait(streams[0], 0);
+  htm->Wait(streams[1], 1);
+  htm->Wait(streams[2], 2);
+  htm->Wait(streams[3], 3);
+  htm->Wait(streams[4], 4);
+
+  htm->Dispatch(streams[5], increment, &answer);
+  htm->Signal(streams[5], 4);
+  htm->Dispatch(streams[4], increment, &answer);
+  htm->Signal(streams[4], 3);
+  htm->Dispatch(streams[3], increment, &answer);
+  htm->Signal(streams[3], 2);
+  htm->Dispatch(streams[2], increment, &answer);
+  htm->Signal(streams[2], 1);
+  htm->Dispatch(streams[1], increment, &answer);
+  htm->Signal(streams[1], 0);
+  htm->Dispatch(streams[0], increment, &answer);
+  htm->WaitOnThreads();
+  CHECK_EQ(answer, 6);
+}
+
+struct ToAppend {
+  std::vector<int>* arr;
+  int value;
+  ToAppend(std::vector<int>* addr, int value) : arr(addr), value(value){};
+};
+
+void append(void* toappend) {
+  ToAppend* cmd = reinterpret_cast<ToAppend*>(toappend);
+  cmd->arr->push_back(cmd->value);
+}
+
+TEST_F(HexagonThreadManagerTest, thread_order) {
+  std::vector<int> arr;
+
+  ToAppend cmd0(&arr, 0);
+  htm->Dispatch(streams[0], append, &cmd0);
+  htm->SyncFromTo(streams[0], streams[1]);
+
+  ToAppend cmd1(&arr, 1);
+  htm->Dispatch(streams[1], append, &cmd1);
+  htm->SyncFromTo(streams[1], streams[2]);
+
+  ToAppend cmd2(&arr, 2);
+  htm->Dispatch(streams[2], append, &cmd2);
+  htm->SyncFromTo(streams[2], streams[3]);
+
+  ToAppend cmd3(&arr, 3);
+  htm->Dispatch(streams[3], append, &cmd3);
+  htm->SyncFromTo(streams[3], streams[4]);
+
+  ToAppend cmd4(&arr, 4);
+  htm->Dispatch(streams[4], append, &cmd4);
+  htm->SyncFromTo(streams[4], streams[5]);
+
+  ToAppend cmd5(&arr, 5);
+  htm->Dispatch(streams[5], append, &cmd5);
+  htm->WaitOnThreads();
+  for (int i = 0; i < threads; ++i) {
+    CHECK_EQ(arr[i], i);
+  }
+}
+
+TEST_F(HexagonThreadManagerTest, thread_order_signal_wait) {
+  std::vector<int> arr;
+
+  htm->Wait(streams[1], 1);
+  htm->Wait(streams[2], 2);
+  htm->Wait(streams[3], 3);
+  htm->Wait(streams[4], 4);
+  htm->Wait(streams[5], 5);
+
+  ToAppend cmd0(&arr, 0);
+  htm->Dispatch(streams[0], append, &cmd0);
+  htm->Signal(streams[0], 1);
+
+  ToAppend cmd1(&arr, 1);
+  htm->Dispatch(streams[1], append, &cmd1);
+  htm->Signal(streams[1], 2);
+
+  ToAppend cmd2(&arr, 2);
+  htm->Dispatch(streams[2], append, &cmd2);
+  htm->Signal(streams[2], 3);
+
+  ToAppend cmd3(&arr, 3);
+  htm->Dispatch(streams[3], append, &cmd3);
+  htm->Signal(streams[3], 4);
+
+  ToAppend cmd4(&arr, 4);
+  htm->Dispatch(streams[4], append, &cmd4);
+  htm->Signal(streams[4], 5);
+
+  ToAppend cmd5(&arr, 5);
+  htm->Dispatch(streams[5], append, &cmd5);
+  htm->WaitOnThreads();
+  for (int i = 0; i < threads; ++i) {
+    CHECK_EQ(arr[i], i);
+  }
+}
+
+struct ToWrite {
+  int* addr;
+  int value;
+  ToWrite(int* addr, int value) : addr(addr), value(value){};
+};
+
+void thread_write_val(void* towrite) {
+  ToWrite* cmd = reinterpret_cast<ToWrite*>(towrite);
+  *(cmd->addr) = cmd->value;
+  delete cmd;
+}
+
+TEST_F(HexagonThreadManagerTest, dispatch_writes) {
+  std::vector<int> array;
+  std::vector<int> truth;
+  array.resize(streams.size());
+  truth.resize(streams.size());
+  for (int i = 0; i < streams.size(); i++) {
+    int val = i * 2;
+    ToWrite* cmd = new ToWrite(&array[i], val);
+    htm->Dispatch(streams[i], thread_write_val, cmd);
+    truth[i] = val;
+  }
+  htm->Start();
+  htm->WaitOnThreads();
+  for (int i = 0; i < streams.size(); i++) {
+    CHECK_EQ(array[i], truth[i]);
+  }
+}