You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by ap...@apache.org on 2019/02/06 22:02:54 UTC

[incubator-mxnet] branch master updated: Increase perfomance of BulkAppend and BulkFlush (#14067)

This is an automated email from the ASF dual-hosted git repository.

apeforest pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new 149d8105 Increase perfomance of BulkAppend and BulkFlush (#14067)
149d8105 is described below

commit 149d8105ea3c4dfd63c3c7e25b3be1e4c4f2ec45
Author: Przemyslaw Tredak <pt...@gmail.com>
AuthorDate: Wed Feb 6 14:02:32 2019 -0800

    Increase perfomance of BulkAppend and BulkFlush (#14067)
    
    * Better bulkappend
    
    * Fix lint
---
 src/engine/threaded_engine.h | 27 ++++++++++++++++-----------
 1 file changed, 16 insertions(+), 11 deletions(-)

diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h
index 18018cb..4a2d419 100644
--- a/src/engine/threaded_engine.h
+++ b/src/engine/threaded_engine.h
@@ -403,6 +403,10 @@ class ThreadedEngine : public Engine {
     BulkStatus& bulk_status = *BulkStatusStore::Get();
     std::swap(bulk_status.bulk_size, bulk_size);
     if (bulk_status.count >= bulk_status.bulk_size) BulkFlush();
+    if (!bulk_status.functions) {
+      bulk_status.functions.reset(new std::vector<SyncFn>());
+    }
+    bulk_status.functions->reserve(bulk_size);
     return bulk_size;
   }
 
@@ -416,7 +420,7 @@ class ThreadedEngine : public Engine {
     /*! \brief context of current ops */
     Context ctx;
     /*! \brief current op functions */
-    SyncFn fn;
+    std::shared_ptr<std::vector<SyncFn>> functions;
     /*! \brief constant variables */
     std::vector<VarHandle> const_vars;
     /*! \brief mutable variables */
@@ -472,15 +476,12 @@ class ThreadedEngine : public Engine {
                          std::vector<VarHandle> const& const_vars,
                          std::vector<VarHandle> const& mutable_vars) {
     BulkStatus& bulk_status = *BulkStatusStore::Get();
+    if (!bulk_status.functions) {
+      bulk_status.functions.reset(new std::vector<SyncFn>());
+    }
+    bulk_status.functions->push_back(exec_fn);
     if (!bulk_status.count) {
       bulk_status.ctx = exec_ctx;
-      bulk_status.fn = std::move(exec_fn);
-    } else {
-      auto prev_fn = std::move(bulk_status.fn);
-      bulk_status.fn = [exec_fn, prev_fn](RunContext rctx) {
-          prev_fn(rctx);
-          exec_fn(rctx);
-        };
     }
 
     ++bulk_status.count;
@@ -497,10 +498,12 @@ class ThreadedEngine : public Engine {
     if (!bulk_status.count) return;
     bulk_status.count = 0;
     DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars);
-    SyncFn fn = std::move(bulk_status.fn);
-    this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) {
+    auto functions = bulk_status.functions;
+    this->PushAsync([functions](RunContext ctx, CallbackOnComplete on_complete) {
         ctx.is_bulk = true;
-        fn(ctx);
+        for (auto& fn : *functions) {
+          fn(ctx);
+        }
         ctx.is_bulk = false;
         bool is_gpu = ctx.ctx.dev_mask() == gpu::kDevMask;
         if (is_gpu) {
@@ -510,6 +513,8 @@ class ThreadedEngine : public Engine {
       }, bulk_status.ctx, bulk_status.const_vars, bulk_status.mutable_vars,
       FnProperty::kNormal, 0, "ImperativeBulk");
 
+    bulk_status.functions.reset(new std::vector<SyncFn>());
+    bulk_status.functions->reserve(bulk_status.bulk_size);
     bulk_status.const_vars.clear();
     bulk_status.mutable_vars.clear();
   }