You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tvm.apache.org by kp...@apache.org on 2022/06/13 17:23:01 UTC
[tvm] branch main updated: [Hexagon] Add HexagonThreadManager (#11653)
This is an automated email from the ASF dual-hosted git repository.
kparzysz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tvm.git
The following commit(s) were added to refs/heads/main by this push:
new 76b9ce9b1f [Hexagon] Add HexagonThreadManager (#11653)
76b9ce9b1f is described below
commit 76b9ce9b1f7d2b7e64b4b9c9d456a02b8a010473
Author: Adam Straw <as...@octoml.ai>
AuthorDate: Mon Jun 13 10:22:54 2022 -0700
[Hexagon] Add HexagonThreadManager (#11653)
* Adding initial threadmanager class
* Fixed compile errors
* Moving constant defines inside class
* Updating qurt includes
* use default scope for hexagon buffers
* Updating buffer allocations
* Fixed bug where array of pointers treated as array of structs
* - Updated HexgonDeviceAPI to use HexagonThreadManager
- Updated HexagonThreadManager interface to use TVMStreams
- Added second `Dispatch` interfce in thread manager to use PackedFuncs
- Updated thread manager to use vector for dynamic semaphore allocation
- Added "#if defined(__hexagon__)" in several places to prevent compilation errors
* Bug fixes + interface addition + basic thread tests
- Fixed GetStreams not returning the streams properly
- Added missing semaphore cleanup to prevent qurt kernel resource leakage
- new interface functions:
- Start() : now all worker threads are blocked on initialization until ThreadManager->Start() is called
- WaitOnThreads() : blocking call which waits until all worker thread queues are empty
- added extra debug logging
- Two new basic thread tests working
* Adding initial ThreadManager tests
* HexagonThreadManager tests and refactor
* remove stack / pipe size member vars
* init pointers in the header file
* move all mem allocs to SpawnThreads
* start_semaphore as object instead of pointer
* fix bug with WaitOnThreads deadlock + Wait/Signal off by one error
* add / refactor Signal / Wait tests
* add SyncFromTo test cases
* add Dispatch test cases
* add pipe fill and overflow cases
* Updating dispatch to return bool and fix pipe overflow problem
* change around min / max values for stack / pipe
* integrate pipe fill / overflow tests back into HTM test suite
* use HexagonBuffer
* assert if stack / pipe sizes fall below min
* Changed semaphore vector to store pointers, not structs (fixes vector capacity adjustment invaliding in-use addresses).
* add producer consumer, thread order test cases
* change to unordered_map for semaphores and remove PreallocateSyncs
* tests running on device
* code cleanup for compile warnings
* remove #if defined(__hexagon__) guards
* copyright, format, lint
* add hexagon buffer map class
* remove redundant thread manager tests
* revert Hex Dev API changes for threading
* add comments; remove untested code to dispatch / wrap a packed func
* pass pipe address and not HTM pointer to thread context
* rename to HexagonBufferManager
* cleanup ahead of PR
* use DLOG(INFO)
* refactor GetStreamHandles to return a vector by value
* adjust HexagonBufferManager methods; use thread_manager file names
* style guidelines and debug prints
* reinterpret cast for TVMStreamHandle
* end member variables with underscore
Co-authored-by: Joseph McMahan <jm...@octoml.ai>
---
src/runtime/hexagon/hexagon_buffer_manager.h | 81 ++++++
src/runtime/hexagon/hexagon_device_api.cc | 29 +-
src/runtime/hexagon/hexagon_device_api.h | 23 +-
src/runtime/hexagon/hexagon_thread_manager.cc | 291 ++++++++++++++++++
src/runtime/hexagon/hexagon_thread_manager.h | 194 ++++++++++++
.../hexagon/hexagon_thread_manager_tests.cc | 324 +++++++++++++++++++++
6 files changed, 901 insertions(+), 41 deletions(-)
diff --git a/src/runtime/hexagon/hexagon_buffer_manager.h b/src/runtime/hexagon/hexagon_buffer_manager.h
new file mode 100644
index 0000000000..658a39fac8
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_buffer_manager.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
+#define TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
+
+#include <tvm/runtime/logging.h>
+
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "hexagon_buffer.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+class HexagonBufferManager {
+ public:
+ /*!
+ * \brief Free a HexagonBuffer.
+ * \param ptr Address of the HexagonBuffer as returned by `AllocateHexagonBuffer`.
+ */
+ void FreeHexagonBuffer(void* ptr) {
+ auto it = hexagon_buffer_map_.find(ptr);
+ CHECK(it != hexagon_buffer_map_.end())
+ << "Attempt made to free unknown or already freed dataspace allocation";
+ CHECK(it->second != nullptr);
+ hexagon_buffer_map_.erase(it);
+ }
+ /*!
+ * \brief Allocate a HexagonBuffer.
+ * \param args Templated arguments to pass through to HexagonBuffer constructor.
+ */
+ template <typename... Args>
+ void* AllocateHexagonBuffer(Args&&... args) {
+ auto buf = std::make_unique<HexagonBuffer>(std::forward<Args>(args)...);
+ void* ptr = buf->GetPointer();
+ hexagon_buffer_map_.insert({ptr, std::move(buf)});
+ return ptr;
+ }
+
+ //! \brief Returns whether the HexagonBuffer is in the map.
+ size_t count(void* ptr) { return hexagon_buffer_map_.count(ptr); }
+
+ //! \brief Returns an iterator to the HexagonBuffer within the map.
+ HexagonBuffer* find(void* ptr) {
+ auto it = hexagon_buffer_map_.find(ptr);
+ if (it != hexagon_buffer_map_.end()) {
+ return it->second.get();
+ }
+ return nullptr;
+ }
+
+ private:
+ //! \brief Contains the HexagonBuffer objects managed by this class.
+ std::unordered_map<void*, std::unique_ptr<HexagonBuffer>> hexagon_buffer_map_;
+};
+
+} // namespace hexagon
+} // namespace runtime
+} // namespace tvm
+
+#endif // TVM_RUNTIME_HEXAGON_HEXAGON_BUFFER_MANAGER_H_
diff --git a/src/runtime/hexagon/hexagon_device_api.cc b/src/runtime/hexagon/hexagon_device_api.cc
index c9c1586008..92a7b22784 100644
--- a/src/runtime/hexagon/hexagon_device_api.cc
+++ b/src/runtime/hexagon/hexagon_device_api.cc
@@ -32,7 +32,6 @@
#include <cstring>
#include "../workspace_pool.h"
-#include "hexagon_buffer.h"
#include "hexagon_common.h"
namespace tvm {
@@ -74,14 +73,14 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, int ndim, const int64_t* shap
}
if (ndim == 0) {
- return AllocateHexagonBuffer(typesize, alignment, mem_scope);
+ return hexbuffs.AllocateHexagonBuffer(typesize, alignment, mem_scope);
} else if (ndim == 1) {
size_t nbytes = shape[0] * typesize;
- return AllocateHexagonBuffer(nbytes, alignment, mem_scope);
+ return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, mem_scope);
} else if (ndim == 2) {
size_t nallocs = shape[0];
size_t nbytes = shape[1] * typesize;
- return AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope);
+ return hexbuffs.AllocateHexagonBuffer(nallocs, nbytes, alignment, mem_scope);
} else {
LOG(FATAL) << "Hexagon Device API supports only 1d and 2d allocations, but received ndim = "
<< ndim;
@@ -97,13 +96,13 @@ void* HexagonDeviceAPI::AllocDataSpace(Device dev, size_t nbytes, size_t alignme
if (alignment < kHexagonAllocAlignment) {
alignment = kHexagonAllocAlignment;
}
- return AllocateHexagonBuffer(nbytes, alignment, String("global"));
+ return hexbuffs.AllocateHexagonBuffer(nbytes, alignment, String("global"));
}
void HexagonDeviceAPI::FreeDataSpace(Device dev, void* ptr) {
CHECK(ptr) << "buffer pointer is null";
CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type;
- FreeHexagonBuffer(ptr);
+ hexbuffs.FreeHexagonBuffer(ptr);
}
// WorkSpace: runtime allocations for Hexagon
@@ -119,7 +118,7 @@ void* HexagonDeviceAPI::AllocWorkspace(Device dev, size_t size, DLDataType type_
void HexagonDeviceAPI::FreeWorkspace(Device dev, void* data) {
CHECK(IsValidDevice(dev)) << "dev.device_type: " << dev.device_type;
- CHECK(hexagon_buffer_map_.count(data) != 0)
+ CHECK(hexbuffs.count(data) != 0)
<< "Attempt made to free unknown or already freed workspace allocation";
dmlc::ThreadLocalStore<HexagonWorkspacePool>::Get()->FreeWorkspace(dev, data);
}
@@ -143,13 +142,7 @@ void HexagonDeviceAPI::CopyDataFromTo(DLTensor* from, DLTensor* to, TVMStreamHan
CHECK_EQ(to->byte_offset, 0);
CHECK_EQ(GetDataSize(*from), GetDataSize(*to));
- auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* {
- auto it = this->hexagon_buffer_map_.find(ptr);
- if (it != this->hexagon_buffer_map_.end()) {
- return it->second.get();
- }
- return nullptr;
- };
+ auto lookup_hexagon_buffer = [this](void* ptr) -> HexagonBuffer* { return hexbuffs.find(ptr); };
HexagonBuffer* hex_from_buf = lookup_hexagon_buffer(from->data);
HexagonBuffer* hex_to_buf = lookup_hexagon_buffer(to->data);
@@ -172,14 +165,6 @@ void HexagonDeviceAPI::CopyDataFromTo(const void* from, size_t from_offset, void
memcpy(static_cast<char*>(to) + to_offset, static_cast<const char*>(from) + from_offset, size);
}
-void HexagonDeviceAPI::FreeHexagonBuffer(void* ptr) {
- auto it = hexagon_buffer_map_.find(ptr);
- CHECK(it != hexagon_buffer_map_.end())
- << "Attempt made to free unknown or already freed dataspace allocation";
- CHECK(it->second != nullptr);
- hexagon_buffer_map_.erase(it);
-}
-
TVM_REGISTER_GLOBAL("device_api.hexagon.mem_copy").set_body([](TVMArgs args, TVMRetValue* rv) {
void* dst = args[0];
void* src = args[1];
diff --git a/src/runtime/hexagon/hexagon_device_api.h b/src/runtime/hexagon/hexagon_device_api.h
index 6f65bf4027..4da12e35fb 100644
--- a/src/runtime/hexagon/hexagon_device_api.h
+++ b/src/runtime/hexagon/hexagon_device_api.h
@@ -30,6 +30,7 @@
#include <vector>
#include "hexagon_buffer.h"
+#include "hexagon_buffer_manager.h"
namespace tvm {
namespace runtime {
@@ -72,7 +73,7 @@ class HexagonDeviceAPI final : public DeviceAPI {
*/
void* AllocWorkspace(Device dev, size_t size, DLDataType type_hint) final;
- //! Erase from tracked hexagon_buffer_map and free
+ //! Erase from HexagonBufferManager and free
void FreeWorkspace(Device dev, void* data) final;
/*!
@@ -127,18 +128,6 @@ class HexagonDeviceAPI final : public DeviceAPI {
TVMStreamHandle stream) final;
private:
- /*! \brief Helper to allocate a HexagonBuffer and register the result
- * in the owned buffer map.
- * \return Raw data storage managed by the hexagon buffer
- */
- template <typename... Args>
- void* AllocateHexagonBuffer(Args&&... args) {
- auto buf = std::make_unique<HexagonBuffer>(std::forward<Args>(args)...);
- void* ptr = buf->GetPointer();
- hexagon_buffer_map_.insert({ptr, std::move(buf)});
- return ptr;
- }
-
/*! \brief Helper to check if the device type is valid for the Hexagon Device API
* \return Boolean indicating whether the device type is valid
*/
@@ -148,12 +137,8 @@ class HexagonDeviceAPI final : public DeviceAPI {
(DLDeviceType(dev.device_type) == kDLCPU);
}
- /*! \brief Helper to free a HexagonBuffer and unregister the result
- * from the owned buffer map.
- */
- void FreeHexagonBuffer(void* ptr);
- //! Lookup table for the HexagonBuffer managing an allocation.
- std::unordered_map<void*, std::unique_ptr<HexagonBuffer>> hexagon_buffer_map_;
+ //! \brief Manages underlying HexagonBuffer allocations
+ HexagonBufferManager hexbuffs;
};
} // namespace hexagon
} // namespace runtime
diff --git a/src/runtime/hexagon/hexagon_thread_manager.cc b/src/runtime/hexagon/hexagon_thread_manager.cc
new file mode 100644
index 0000000000..5d67b142e5
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_thread_manager.cc
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "hexagon_thread_manager.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+HexagonThreadManager::HexagonThreadManager(unsigned num_threads, unsigned thread_stack_size_bytes,
+ unsigned thread_pipe_size_words) {
+ // Note: could technically manage more software threads than allowable hardware threads, but there
+ // is no system constant defined
+ // in the qurt libs for that maximum.
+ CHECK(num_threads);
+ CHECK_LE(num_threads, QURT_MAX_HTHREAD_LIMIT);
+ nthreads_ = num_threads;
+
+ CHECK_GE(thread_stack_size_bytes, MIN_STACK_SIZE_BYTES);
+ CHECK_LE(thread_stack_size_bytes, MAX_STACK_SIZE_BYTES);
+
+ CHECK_GE(thread_pipe_size_words, MIN_PIPE_SIZE_WORDS);
+ CHECK_LE(thread_pipe_size_words, MAX_PIPE_SIZE_WORDS);
+
+ DLOG(INFO) << "Spawning threads";
+ SpawnThreads(thread_stack_size_bytes, thread_pipe_size_words);
+
+ // Initially, block all threads until we get the Start() call
+ qurt_sem_init_val(&start_semaphore_, 0);
+ for (unsigned i = 0; i < nthreads_; i++) {
+ Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_wait, &start_semaphore_);
+ }
+}
+
+HexagonThreadManager::~HexagonThreadManager() {
+ // In case Start() was never explicitly called, call it now to prevent deadlock
+ if (qurt_sem_get_val(&start_semaphore_) == 0) {
+ Start();
+ }
+
+ DLOG(INFO) << "Threads started";
+
+ // dispatch a command to each thread to exit with status 0
+ for (unsigned i = 0; i < nthreads_; i++) {
+ bool success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_exit, nullptr);
+ while (!success) {
+ success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_exit, nullptr);
+ }
+ }
+
+ DLOG(INFO) << "Threads exited";
+
+ // join with each thread (wait for them to terminate); if already exited, the call returns
+ // immediately
+ int status; // don't actually care what the thread exit status was
+ for (unsigned i = 0; i < nthreads_; i++) {
+ qurt_thread_join(threads_[i], &status);
+ }
+
+ DLOG(INFO) << "Threads joined";
+
+ // Destroy semaphores
+ qurt_sem_destroy(&start_semaphore_);
+ for (auto it : semaphores_) {
+ qurt_sem_destroy(it.second);
+ free(it.second);
+ }
+
+ DLOG(INFO) << "Semaphores destroyed";
+
+ // Delete pipe objects and contexts
+ for (unsigned i = 0; i < nthreads_; i++) {
+ qurt_pipe_destroy(&pipes_[i]);
+ delete contexts_[i];
+ }
+
+ DLOG(INFO) << "Pipes and contexts deleted";
+
+ // Dealloc memory blocks
+ hexbuffs_.FreeHexagonBuffer(stack_buffer_);
+ hexbuffs_.FreeHexagonBuffer(pipe_buffer_);
+
+ DLOG(INFO) << "Buffers freed";
+}
+
+void HexagonThreadManager::SpawnThreads(unsigned thread_stack_size_bytes,
+ unsigned thread_pipe_size_words) {
+ // allocate all stack space for threads
+ stack_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_stack_size_bytes * nthreads_,
+ MEM_ALIGNMENT, String("global"));
+ // allocate space for pipe buffers (command queues)
+ unsigned thread_pipe_size_bytes = thread_pipe_size_words * sizeof(qurt_pipe_data_t);
+ pipe_buffer_ = hexbuffs_.AllocateHexagonBuffer(thread_pipe_size_bytes * nthreads_, MEM_ALIGNMENT,
+ String("global"));
+
+ threads_.resize(nthreads_);
+ pipes_.resize(nthreads_);
+ contexts_.resize(nthreads_);
+
+ DLOG(INFO) << "Buffers allocated";
+
+ // First, create pipe resources for all threads
+ char* next_pipe_start = reinterpret_cast<char*>(pipe_buffer_);
+ for (unsigned i = 0; i < nthreads_; i++) {
+ qurt_pipe_attr_t pipe_attr;
+ qurt_pipe_attr_init(&pipe_attr);
+ qurt_pipe_attr_set_buffer(&pipe_attr, reinterpret_cast<qurt_pipe_data_t*>(next_pipe_start));
+ next_pipe_start += thread_pipe_size_bytes;
+ qurt_pipe_attr_set_buffer_partition(&pipe_attr, QURT_PIPE_ATTR_MEM_PARTITION_RAM);
+ qurt_pipe_attr_set_elements(&pipe_attr, thread_pipe_size_words);
+
+ // create the pipe
+ int rc = qurt_pipe_init(&pipes_[i], &pipe_attr);
+ CHECK_EQ(rc, QURT_EOK);
+ }
+
+ DLOG(INFO) << "Pipes created";
+
+ // Create all threads
+ char* next_stack_start = reinterpret_cast<char*>(stack_buffer_);
+ for (unsigned i = 0; i < nthreads_; i++) {
+ // create initialize the thread attr
+ qurt_thread_attr_t thread_attr;
+ char name[32];
+ qurt_thread_attr_init(&thread_attr);
+ qurt_thread_attr_set_stack_addr(&thread_attr, next_stack_start);
+ qurt_thread_attr_set_stack_size(&thread_attr, thread_stack_size_bytes);
+ snprintf(name, sizeof(name), "thread %d", i);
+ qurt_thread_attr_set_name(&thread_attr, name);
+ next_stack_start += thread_stack_size_bytes;
+
+ // create the thread
+ contexts_[i] = new ThreadContext(&pipes_[i], i);
+ int rc = qurt_thread_create(&threads_[i], &thread_attr, thread_main, contexts_[i]);
+ CHECK_EQ(rc, QURT_EOK);
+ }
+
+ DLOG(INFO) << "Threads created";
+}
+
+const std::vector<TVMStreamHandle> HexagonThreadManager::GetStreamHandles() {
+ std::vector<TVMStreamHandle> out;
+ for (unsigned i = 0; i < nthreads_; i++) {
+ // threads identified by index into `threads` array
+ out.push_back(reinterpret_cast<TVMStreamHandle>(i));
+ }
+ return out;
+}
+
+bool HexagonThreadManager::Dispatch(TVMStreamHandle stream, voidfunc f, void* args) {
+ unsigned thread = reinterpret_cast<unsigned>(stream);
+ DLOG(INFO) << "Dispatching to stream " << thread;
+ Command* cmd = new Command(f, args); // Command object freed by receiving thread
+ qurt_pipe_data_t msg = (qurt_pipe_data_t)(cmd);
+ qurt_pipe_t* pipeAddr = &pipes_[thread];
+
+ int trysend = qurt_pipe_try_send(pipeAddr, msg);
+ return trysend == 0;
+}
+
+void HexagonThreadManager::Start() { thread_signal(&start_semaphore_); }
+
+void HexagonThreadManager::WaitOnThreads() {
+ // Using standard signal mechanism to block the "main" thread on all worker threads.
+ // Note: this would be slightly more efficient as a barrier, but would need some extra code to
+ // wait on the barrier that would only be used once.
+
+ // In case Start() was never explicitly called, call it now to prevent deadlock
+ if (qurt_sem_get_val(&start_semaphore_) == 0) {
+ Start();
+ }
+
+ std::vector<qurt_sem_t> finished;
+ finished.resize(nthreads_);
+
+ // initialize one semaphore for each thread
+ for (unsigned i = 0; i < nthreads_; i++) {
+ qurt_sem_init_val(&finished[i], 0);
+ }
+ // dispatch signal() command to each thread on their private semaphore
+ for (unsigned i = 0; i < nthreads_; i++) {
+ bool success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_signal, &finished[i]);
+ while (!success) {
+ success = Dispatch(reinterpret_cast<TVMStreamHandle>(i), thread_signal, &finished[i]);
+ }
+ }
+ // wait on each semaphore, one at a time
+ for (unsigned i = 0; i < nthreads_; i++) {
+ thread_wait(&finished[i]);
+ }
+
+ // clean up
+ for (unsigned i = 0; i < nthreads_; i++) {
+ qurt_sem_destroy(&finished[i]);
+ }
+}
+
+void HexagonThreadManager::CheckSemaphore(unsigned syncID) {
+ if (semaphores_.find(syncID) == semaphores_.end()) {
+ semaphores_[syncID] = reinterpret_cast<qurt_sem_t*>(malloc(sizeof(qurt_sem_t)));
+ qurt_sem_init_val(semaphores_[syncID], 0);
+ }
+}
+
+bool HexagonThreadManager::Signal(TVMStreamHandle thread, SyncPoint syncID) {
+ CheckSemaphore(syncID);
+ DLOG(INFO) << "Dispatching signal to thread " << thread << " on semaphore ID " << syncID
+ << " located @ 0x" << std::hex << semaphores_[syncID];
+ return Dispatch(thread, thread_signal, semaphores_[syncID]);
+}
+
+bool HexagonThreadManager::Wait(TVMStreamHandle thread, SyncPoint syncID) {
+ CheckSemaphore(syncID);
+ DLOG(INFO) << "Dispatching wait to thread " << thread << " on semaphore ID " << syncID
+ << " located @ 0x" << std::hex << semaphores_[syncID];
+ return Dispatch(thread, thread_wait, semaphores_[syncID]);
+}
+
+/* Create a sync_from_to relationship with a dynamic semaphore allocation.
+Makes use of thread_wait_free to also free the semaphore after sync is complete.
+*/
+bool HexagonThreadManager::SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread) {
+ qurt_sem_t* sem = reinterpret_cast<qurt_sem_t*>(malloc(sizeof(qurt_sem_t)));
+ qurt_sem_init_val(sem, 0);
+ if (Dispatch(signal_thread, thread_signal, sem)) {
+ return Dispatch(wait_thread, thread_wait_free, sem);
+ } else {
+ return false;
+ }
+}
+
+void HexagonThreadManager::thread_signal(void* semaphore) {
+ DLOG(INFO) << "Signaling semaphore addr 0x" << std::hex << semaphore;
+ qurt_sem_add(reinterpret_cast<qurt_sem_t*>(semaphore), QURT_MAX_HTHREAD_LIMIT);
+}
+
+void HexagonThreadManager::thread_wait(void* semaphore) {
+ DLOG(INFO) << "Waiting on semaphore addr 0x" << std::hex << semaphore;
+ qurt_sem_down(reinterpret_cast<qurt_sem_t*>(semaphore));
+}
+
+/* Wait on the passed semaphore object, then free it. */
+void HexagonThreadManager::thread_wait_free(void* semaphore) {
+ qurt_sem_down(reinterpret_cast<qurt_sem_t*>(semaphore)); // blocks until signal is complete
+ qurt_sem_destroy(reinterpret_cast<qurt_sem_t*>(semaphore));
+ free(semaphore);
+}
+
+void HexagonThreadManager::thread_exit(void* status) {
+ DLOG(INFO) << "thread exiting";
+ qurt_thread_exit((uint64_t)status);
+}
+
+void HexagonThreadManager::thread_main(void* context) {
+ ThreadContext* tc = static_cast<ThreadContext*>(context);
+ unsigned index = tc->index;
+ qurt_pipe_t* mypipe = tc->pipe;
+
+ DLOG(INFO) << "Thread " << index << " spawned";
+
+ while (true) { // loop, executing commands from pipe
+ DLOG(INFO) << "Thread " << index << " receiving command";
+ qurt_pipe_data_t msg = qurt_pipe_receive(mypipe); // blocks if empty
+ Command* cmd = reinterpret_cast<Command*>(msg);
+ voidfunc f = cmd->f;
+ void* args = cmd->args;
+ delete cmd;
+ f(args);
+ }
+ // thread exit is handled by dispatching an exit command
+}
+
+} // namespace hexagon
+} // namespace runtime
+} // namespace tvm
diff --git a/src/runtime/hexagon/hexagon_thread_manager.h b/src/runtime/hexagon/hexagon_thread_manager.h
new file mode 100644
index 0000000000..3422fef387
--- /dev/null
+++ b/src/runtime/hexagon/hexagon_thread_manager.h
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
+#define TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
+
+#include <tvm/runtime/c_runtime_api.h>
+#include <tvm/runtime/logging.h>
+#include <tvm/runtime/packed_func.h>
+
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "hexagon_buffer.h"
+#include "hexagon_buffer_manager.h"
+#include "hexagon_common.h"
+#include "qurt.h"
+
+namespace tvm {
+namespace runtime {
+namespace hexagon {
+
+class HexagonThreadManager {
+ //! \brief Void function.
+ using voidfunc = void (*)(void*);
+ //! \brief Semaphore ID.
+ using SyncPoint = unsigned;
+ //! \brief Alignment of underlying memory allocations.
+ const unsigned MEM_ALIGNMENT = 32;
+ //! \brief Minimum stack size in bytes per thread.
+ const unsigned MIN_STACK_SIZE_BYTES = 0x400; // 1KB
+ //! \brief Maximum stack size in bytes per thread.
+ const unsigned MAX_STACK_SIZE_BYTES = 0x10000; // 64KB
+ //! \brief Minimum pipe (or command buffer) size in words (or commands) per thread.
+ const unsigned MIN_PIPE_SIZE_WORDS = 10;
+ //! \brief Maximum pipe (or command buffer) size in words (or commands) per thread.
+ const unsigned MAX_PIPE_SIZE_WORDS = 0x10000; // 64K words
+
+ public:
+ /*!
+ * \brief Spawn a number of Hexagon threads with a given stack (in bytes) and pipe (a.k.a. command
+ * buffer; in words or commands) within the min and max values specified above.
+ * \param num_threads Number of threads to spawn.
+ * \param thread_stack_size_bytes Stack size in bytes per thread.
+ * \param thread_pipe_size_words Pipe (or command buffer) size in words (or commands).
+ */
+ HexagonThreadManager(unsigned, unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words);
+
+ //! \brief Destructor
+ ~HexagonThreadManager();
+
+ /*!
+ * \brief Get the spawned threads as stream handles.
+ * \returns Vector of stream handles.
+ */
+ const std::vector<TVMStreamHandle> GetStreamHandles();
+
+ /*!
+ * \brief Non-blocking dispatch of a void function and args on a given thread.
+ * \param thread Stream handle of the thread on which to dispatch the void function.
+ * \param f Void function to be dispatched.
+ * \param args Arguments to pass to the void function.
+ * \returns Boolean value indicating success or failure of the dispatch; user must either 1)
+ * `Start` threads executing to clear space in the pipe before retrying dispatch or 2) create a
+ * `HexagonThreadManager` with a larger pipe.
+ */
+ bool Dispatch(TVMStreamHandle thread, voidfunc f, void* args);
+ /*!
+ * \brief Non-blocking signal of a semaphore with a given ID.
+ * \param thread Stream handle of the thread which will signal the semaphore.
+ * \param syncID ID of the semaphore to be signaled.
+ * \returns Boolean value indicating success or failure of the dispatch of the signal; user must
+ * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2)
+ * create a `HexagonThreadManager` with a larger pipe.
+ */
+ bool Signal(TVMStreamHandle thread, SyncPoint syncID);
+ /*!
+ * \brief Non-blocking wait on a semaphore with a given ID.
+ * \param thread Stream handle of the thread which will wait on the semaphore.
+ * \param syncID ID of the semaphore on which to wait.
+ * \returns Boolean value indicating success or failure of the dispatch of the wait; user must
+ * either 1) `Start` threads executing to clear space in the pipe before retrying dispatch or 2)
+ * create a `HexagonThreadManager` with a larger pipe.
+ */
+ bool Wait(TVMStreamHandle thread, SyncPoint syncID);
+ /*!
+ * \brief Creates a synchronization point between two threads by creating a semaphore,
+ *dispatching the `signal_thread` to signal that semaphore and dispatching the `wait_thread to
+ *wait on that semaphore.
+ * \param signal_thread Stream handle for the thread which will signal the
+ *semaphore.
+ * \param wait_thread Stream handle for the thread which will wait on the semaphore.
+ * \returns Boolean value indicating success or failure of the combined dispatch of both the
+ *signal and the wait; user must either 1) `Start` threads executing to clear space in the pipe
+ *before retrying dispatch or 2) create a `HexagonThreadManager` with a larger pipe.
+ */
+ bool SyncFromTo(TVMStreamHandle signal_thread, TVMStreamHandle wait_thread);
+ //! \brief Unblock threads to start execution.
+ void Start();
+ //! \brief Unblock threads to start execution if `Start` has not already been called; blocking
+ //! call to wait until all threads have empty pipes.
+ void WaitOnThreads();
+
+ private:
+ struct ThreadContext {
+ qurt_pipe_t* pipe;
+ unsigned index;
+ ThreadContext(qurt_pipe_t* pipe, unsigned index) : pipe(pipe), index(index) {}
+ };
+
+ //! \brief Helper function for the constructor to spawn threads.
+ void SpawnThreads(unsigned thread_stack_size_bytes, unsigned thread_pipe_size_words);
+
+ //! \brief Helper function for `Signal` and `Wait` to create, initialize and map semaphores by ID.
+ void CheckSemaphore(unsigned syncID);
+
+ //! \brief Void function executed by a thread to signal a semaphore.
+ static void thread_signal(void* semaphore);
+
+ //! \brief Void function executed by a thread to wait on a semaphore; used by `Wait`.
+ static void thread_wait(void* semaphore);
+
+ //! \brief Void function executed by a thread to wait on and free a semaphore; used by
+ //! `SyncFromTo`.
+ static void thread_wait_free(void* semaphore);
+
+ //! \brief Void function executed by a thread to exit at time of destruction.
+ static void thread_exit(void* status);
+
+ //! \brief Void function executed by each thread as `main`.
+ static void thread_main(void* context);
+
+ //! \brief Manages underlying HexagonBuffer allocations.
+ HexagonBufferManager hexbuffs_;
+
+ //! \brief Number of threads allocatted.
+ unsigned nthreads_{0};
+
+ //! \brief Pointer to the base of the stacks allocated for all threads; size = `nthreads` *
+ //! `thread_stack_size_bytes`.
+ void* stack_buffer_{nullptr};
+
+ //! \brief Pointer to the base of the pipes (or command buffers) allocated for all threads; size =
+ //! `nthreads` * `thread_pipe_size_words` * sizeof(word).
+ void* pipe_buffer_{nullptr};
+
+ //! \brief QURT thread structure for each spawned thread.
+ std::vector<qurt_thread_t> threads_;
+
+ //! \brief QURT pipe (or command buffer) structure for each spawned thread.
+ std::vector<qurt_pipe_t> pipes_;
+
+ //! \brief Thread context passed into each `thread_main` function.
+ std::vector<ThreadContext*> contexts_;
+
+ //! \brief Semaphores used by `Signal` and `Wait` mapped by ID.
+ std::unordered_map<unsigned, qurt_sem_t*> semaphores_;
+
+ //! \brief Start semaphore created at time of construction; signled by `Start`.
+ qurt_sem_t start_semaphore_;
+
+ /*!
+ *\brief Encapsulate a void function pointer + arg pointer; sent via pipe to threads to execute.
+ */
+ struct Command {
+ voidfunc f;
+ void* args;
+ Command(voidfunc f, void* args) : f(f), args(args) {}
+ };
+};
+
+} // namespace hexagon
+} // namespace runtime
+} // namespace tvm
+
+#endif // TVM_RUNTIME_HEXAGON_HEXAGON_THREAD_MANAGER_H_
diff --git a/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc
new file mode 100644
index 0000000000..aa86e4638d
--- /dev/null
+++ b/tests/cpp-runtime/hexagon/hexagon_thread_manager_tests.cc
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <gtest/gtest.h>
+#include <tvm/runtime/logging.h>
+
+#include "../src/runtime/hexagon/hexagon_thread_manager.h"
+
+using namespace tvm::runtime;
+using namespace tvm::runtime::hexagon;
+
+class HexagonThreadManagerTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ htm = new HexagonThreadManager(threads, stack_size, pipe_size);
+ streams = htm->GetStreamHandles();
+ }
+ void TearDown() override { delete htm; }
+ HexagonThreadManager* htm{nullptr};
+ std::vector<TVMStreamHandle> streams;
+ int answer{0};
+ const unsigned threads{6};
+ const unsigned pipe_size{100};
+ const unsigned stack_size{0x4000}; // 16KB
+};
+
+TEST_F(HexagonThreadManagerTest, ctor_errors) {
+ // zero threads
+ ASSERT_THROW(HexagonThreadManager(0, stack_size, pipe_size), InternalError);
+ // too many threads
+ ASSERT_THROW(HexagonThreadManager(0x10000000, stack_size, pipe_size), InternalError);
+ // stack too small
+ ASSERT_THROW(HexagonThreadManager(6, 0, pipe_size), InternalError);
+ // stack too big
+ ASSERT_THROW(HexagonThreadManager(6, 0x10000000, pipe_size), InternalError);
+ // pipe too small
+ ASSERT_THROW(HexagonThreadManager(6, stack_size, 9), InternalError);
+ // pipe too big
+ ASSERT_THROW(HexagonThreadManager(6, stack_size, 0x10000000), InternalError);
+}
+
+TEST_F(HexagonThreadManagerTest, init) {
+ CHECK(htm != nullptr);
+ CHECK_EQ(streams.size(), threads);
+}
+
+void get_the_answer(void* answer) { *reinterpret_cast<int*>(answer) = 42; }
+
+TEST_F(HexagonThreadManagerTest, dispatch) {
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->Start();
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, dispatch_wait) {
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, wait_signal) {
+ htm->Wait(streams[0], 0);
+ htm->Signal(streams[1], 0);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, re_signal) {
+ htm->Wait(streams[0], 0);
+ htm->Signal(streams[1], 0);
+ htm->Signal(streams[1], 0);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, re_wait) {
+ htm->Wait(streams[0], 0);
+ htm->Signal(streams[1], 0);
+ htm->Wait(streams[0], 0);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, wait_signal_x2) {
+ htm->Wait(streams[0], 0);
+ htm->Signal(streams[1], 0);
+ htm->Wait(streams[0], 1);
+ htm->Signal(streams[1], 1);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, signal_wait) {
+ htm->Signal(streams[1], 0);
+ htm->Wait(streams[0], 0);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to) {
+ htm->SyncFromTo(streams[1], streams[0]);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_self) {
+ htm->SyncFromTo(streams[0], streams[0]);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_x2) {
+ htm->SyncFromTo(streams[0], streams[1]);
+ htm->SyncFromTo(streams[1], streams[0]);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, sync_from_to_all) {
+ htm->SyncFromTo(streams[5], streams[4]);
+ htm->SyncFromTo(streams[4], streams[3]);
+ htm->SyncFromTo(streams[3], streams[2]);
+ htm->SyncFromTo(streams[2], streams[1]);
+ htm->SyncFromTo(streams[1], streams[0]);
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, pipe_fill) {
+ // fill the pipe
+ for (int i = 0; i < pipe_size; ++i) {
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ }
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 42);
+}
+
+TEST_F(HexagonThreadManagerTest, pipe_overflow) {
+ // fill the pipe
+ for (int i = 0; i < pipe_size; ++i) {
+ htm->Dispatch(streams[0], get_the_answer, &answer);
+ }
+ // overflow the pipe
+ bool space = htm->Dispatch(streams[0], get_the_answer, &answer);
+ CHECK_EQ(space, false);
+}
+
+void increment(void* voidptr) {
+ int* intptr = reinterpret_cast<int*>(voidptr);
+ *intptr = *intptr + 1;
+}
+
+TEST_F(HexagonThreadManagerTest, producer_consumer) {
+ htm->Dispatch(streams[5], increment, &answer);
+ htm->SyncFromTo(streams[5], streams[4]);
+ htm->Dispatch(streams[4], increment, &answer);
+ htm->SyncFromTo(streams[4], streams[3]);
+ htm->Dispatch(streams[3], increment, &answer);
+ htm->SyncFromTo(streams[3], streams[2]);
+ htm->Dispatch(streams[2], increment, &answer);
+ htm->SyncFromTo(streams[2], streams[1]);
+ htm->Dispatch(streams[1], increment, &answer);
+ htm->SyncFromTo(streams[1], streams[0]);
+ htm->Dispatch(streams[0], increment, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 6);
+}
+
+TEST_F(HexagonThreadManagerTest, producer_consumer_signal_wait) {
+ htm->Wait(streams[0], 0);
+ htm->Wait(streams[1], 1);
+ htm->Wait(streams[2], 2);
+ htm->Wait(streams[3], 3);
+ htm->Wait(streams[4], 4);
+
+ htm->Dispatch(streams[5], increment, &answer);
+ htm->Signal(streams[5], 4);
+ htm->Dispatch(streams[4], increment, &answer);
+ htm->Signal(streams[4], 3);
+ htm->Dispatch(streams[3], increment, &answer);
+ htm->Signal(streams[3], 2);
+ htm->Dispatch(streams[2], increment, &answer);
+ htm->Signal(streams[2], 1);
+ htm->Dispatch(streams[1], increment, &answer);
+ htm->Signal(streams[1], 0);
+ htm->Dispatch(streams[0], increment, &answer);
+ htm->WaitOnThreads();
+ CHECK_EQ(answer, 6);
+}
+
+struct ToAppend {
+ std::vector<int>* arr;
+ int value;
+ ToAppend(std::vector<int>* addr, int value) : arr(addr), value(value){};
+};
+
+void append(void* toappend) {
+ ToAppend* cmd = reinterpret_cast<ToAppend*>(toappend);
+ cmd->arr->push_back(cmd->value);
+}
+
+TEST_F(HexagonThreadManagerTest, thread_order) {
+ std::vector<int> arr;
+
+ ToAppend cmd0(&arr, 0);
+ htm->Dispatch(streams[0], append, &cmd0);
+ htm->SyncFromTo(streams[0], streams[1]);
+
+ ToAppend cmd1(&arr, 1);
+ htm->Dispatch(streams[1], append, &cmd1);
+ htm->SyncFromTo(streams[1], streams[2]);
+
+ ToAppend cmd2(&arr, 2);
+ htm->Dispatch(streams[2], append, &cmd2);
+ htm->SyncFromTo(streams[2], streams[3]);
+
+ ToAppend cmd3(&arr, 3);
+ htm->Dispatch(streams[3], append, &cmd3);
+ htm->SyncFromTo(streams[3], streams[4]);
+
+ ToAppend cmd4(&arr, 4);
+ htm->Dispatch(streams[4], append, &cmd4);
+ htm->SyncFromTo(streams[4], streams[5]);
+
+ ToAppend cmd5(&arr, 5);
+ htm->Dispatch(streams[5], append, &cmd5);
+ htm->WaitOnThreads();
+ for (int i = 0; i < threads; ++i) {
+ CHECK_EQ(arr[i], i);
+ }
+}
+
+TEST_F(HexagonThreadManagerTest, thread_order_signal_wait) {
+ std::vector<int> arr;
+
+ htm->Wait(streams[1], 1);
+ htm->Wait(streams[2], 2);
+ htm->Wait(streams[3], 3);
+ htm->Wait(streams[4], 4);
+ htm->Wait(streams[5], 5);
+
+ ToAppend cmd0(&arr, 0);
+ htm->Dispatch(streams[0], append, &cmd0);
+ htm->Signal(streams[0], 1);
+
+ ToAppend cmd1(&arr, 1);
+ htm->Dispatch(streams[1], append, &cmd1);
+ htm->Signal(streams[1], 2);
+
+ ToAppend cmd2(&arr, 2);
+ htm->Dispatch(streams[2], append, &cmd2);
+ htm->Signal(streams[2], 3);
+
+ ToAppend cmd3(&arr, 3);
+ htm->Dispatch(streams[3], append, &cmd3);
+ htm->Signal(streams[3], 4);
+
+ ToAppend cmd4(&arr, 4);
+ htm->Dispatch(streams[4], append, &cmd4);
+ htm->Signal(streams[4], 5);
+
+ ToAppend cmd5(&arr, 5);
+ htm->Dispatch(streams[5], append, &cmd5);
+ htm->WaitOnThreads();
+ for (int i = 0; i < threads; ++i) {
+ CHECK_EQ(arr[i], i);
+ }
+}
+
+struct ToWrite {
+ int* addr;
+ int value;
+ ToWrite(int* addr, int value) : addr(addr), value(value){};
+};
+
+void thread_write_val(void* towrite) {
+ ToWrite* cmd = reinterpret_cast<ToWrite*>(towrite);
+ *(cmd->addr) = cmd->value;
+ delete cmd;
+}
+
+TEST_F(HexagonThreadManagerTest, dispatch_writes) {
+ std::vector<int> array;
+ std::vector<int> truth;
+ array.resize(streams.size());
+ truth.resize(streams.size());
+ for (int i = 0; i < streams.size(); i++) {
+ int val = i * 2;
+ ToWrite* cmd = new ToWrite(&array[i], val);
+ htm->Dispatch(streams[i], thread_write_val, cmd);
+ truth[i] = val;
+ }
+ htm->Start();
+ htm->WaitOnThreads();
+ for (int i = 0; i < streams.size(); i++) {
+ CHECK_EQ(array[i], truth[i]);
+ }
+}