You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by zh...@apache.org on 2019/03/15 02:17:01 UTC
[incubator-mxnet] branch master updated: Support multi-threading
for Custom Operator (#14363)
This is an automated email from the ASF dual-hosted git repository.
zhasheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new 74c2274 Support multi-threading for Custom Operator (#14363)
74c2274 is described below
commit 74c227402ef0a9c7d227368aafea55f0c1d28311
Author: JackieWu <wk...@live.cn>
AuthorDate: Fri Mar 15 10:16:25 2019 +0800
Support multi-threading for Custom Operator (#14363)
* Support multi-threading for Custom Operator
* update
* Update custom-inl.h
---
docs/faq/env_var.md | 3 +++
src/operator/custom/custom-inl.h | 51 ++++++++++++++++++++++++++--------------
2 files changed, 37 insertions(+), 17 deletions(-)
diff --git a/docs/faq/env_var.md b/docs/faq/env_var.md
index 095c214..2768f64 100644
--- a/docs/faq/env_var.md
+++ b/docs/faq/env_var.md
@@ -60,6 +60,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
* MXNET_MP_OPENCV_NUM_THREADS
- Values: Int ```(default=0)```
- The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
+* MXNET_CUSTOM_OP_NUM_THREADS
+ - Values: Int ```(default=16)```
+ - The maximum number of threads given to custom operators.
## Memory Options
diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h
index de82f3e..f88e830 100644
--- a/src/operator/custom/custom-inl.h
+++ b/src/operator/custom/custom-inl.h
@@ -31,6 +31,7 @@
#include <mxnet/operator.h>
#include <mxnet/c_api.h>
#include <mxnet/imperative.h>
+#include <algorithm>
#include <map>
#include <vector>
#include <string>
@@ -129,6 +130,9 @@ class CustomOperator {
ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0,
"CustomOperator");
});
+ // increase num_threads if there is not enough threads to execute custom operator
+ if (q_.size() > num_free_threads)
+ CreateThreads(q_.size() - num_free_threads);
cv_.notify_all();
}
@@ -139,38 +143,51 @@ class CustomOperator {
destructing_ = true;
cv_.notify_all();
}
- worker_.join();
+ for (auto &worker : workers_)
+ worker.join();
}
static CustomOperator* Get();
private:
- CustomOperator() {
+ CustomOperator() : num_free_threads(0) {
destructing_ = false;
naive_engine_ = true;
if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) {
naive_engine_ = false;
- worker_ = std::thread(
- [&]() {
- std::unique_lock<std::mutex> lock(mutex_);
- while (!q_.empty() || !destructing_) {
- cv_.wait(lock, [&] {return !q_.empty() || destructing_;});
- while (!q_.empty()) {
- auto fn = q_.front();
- lock.unlock();
- fn();
- lock.lock();
- q_.pop();
- }
- }
- });
}
}
+ void ThreadTarget() {
+ std::unique_lock<std::mutex> lock(mutex_);
+ while (!q_.empty() || !destructing_) {
+ cv_.wait(lock, [&] {return !q_.empty() || destructing_;});
+ while (!q_.empty()) {
+ --num_free_threads;
+ auto fn = q_.front();
+ q_.pop();
+ lock.unlock();
+ fn();
+ ++num_free_threads;
+ lock.lock();
+ }
+ }
+ }
+ void SetNumThreads(int num_threads) {
+ num_threads = std::min(dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 16), num_threads);
+ for (int i = workers_.size(); i < num_threads; ++i) {
+ workers_.emplace_back(std::thread([this]{this->ThreadTarget();}));
+ ++num_free_threads;
+ }
+ }
+ void CreateThreads(int num_new_threads) {
+ SetNumThreads(workers_.size() + num_new_threads);
+ }
std::mutex mutex_;
std::map<std::string, CustomOpPropCreator> registry_;
// async worker
std::condition_variable cv_;
- std::thread worker_;
+ std::vector<std::thread> workers_;
+ std::atomic<uint32_t> num_free_threads;
std::queue<std::function<void(void)> > q_;
bool naive_engine_;
bool destructing_;