You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by GitBox <gi...@apache.org> on 2021/09/02 18:01:07 UTC

[GitHub] [incubator-mxnet] barry-jin commented on a change in pull request #20331: [WIP] Add async GPU dependency Engine

barry-jin commented on a change in pull request #20331:
URL: https://github.com/apache/incubator-mxnet/pull/20331#discussion_r701285974



##########
File path: src/engine/threaded_engine.cc
##########
@@ -523,5 +543,206 @@ void ThreadedEngine::OnCompleteStatic(Engine *engine, void *opr_block_,
   OprBlock::Delete(opr_block);
 }
 
+void ThreadedEngine::OnStartStatic(Engine *engine, void *opr_block,
+                                   const dmlc::Error* error) {
+  // no-op
+}
+
+#if MXNET_USE_CUDA
+static inline void AddEventHelper(
+  std::unordered_map<cudaStream_t, EventInfo>* events_per_stream,
+  const EventInfo& cuda_event) {
+  auto event_stream = cuda_event.stream;
+  if (events_per_stream->count(event_stream) > 0) {
+    if ((*events_per_stream)[event_stream].pool_index < cuda_event.pool_index) {
+      (*events_per_stream)[event_stream] = cuda_event;
+    }
+  } else {
+    (*events_per_stream).emplace(event_stream, cuda_event);
+  }
+}
+
+void ThreadedEngine::OnStartCPU(Engine *engine, void *opr_block,
+                        const dmlc::Error* error) {
+  static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false);
+  if (!use_new_dep_engine) {
+    return;
+  }
+  ThreadedOpr *threaded_opr = static_cast<OprBlock*>(opr_block)->opr;
+  std::unordered_map<cudaStream_t, EventInfo> event_per_stream;
+  for (auto* read_var : threaded_opr->const_vars) {
+    auto &sync_obj = read_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& cuda_event : reader_events) {
+      AddEventHelper(&event_per_stream, cuda_event);
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();
+      } else {
+        AddEventHelper(&event_per_stream, sync_obj.writer_event[0]);
+      }
+    }
+  }
+
+  for (auto* write_var : threaded_opr->mutable_vars) {
+    auto &sync_obj = write_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& cuda_event : reader_events) {
+        AddEventHelper(&event_per_stream, cuda_event);
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();
+      } else {
+        AddEventHelper(&event_per_stream, sync_obj.writer_event[0]);
+      }
+    }
+  }
+  for (auto event : event_per_stream) {
+    auto ev = event.second.event.lock();
+    MSHADOW_CUDA_CALL(cudaEventSynchronize(*ev));
+  }
+}
+
+void ThreadedEngine::OnStartGPU(Engine *engine, void *sync_info,
+                        const dmlc::Error* error) {
+  static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false);
+  if (!use_new_dep_engine) {
+    return;
+  }
+  auto *info = reinterpret_cast<GPUWorkerSyncInfo *>(sync_info);
+  CHECK(info->stream != nullptr);
+  auto *worker_stream = reinterpret_cast<mshadow::Stream<gpu> *>(info->stream);
+  ThreadedOpr *threaded_opr = static_cast<OprBlock*>(info->opr_block)->opr;
+  std::unordered_map<cudaStream_t, EventInfo> event_per_stream;
+  for (auto* read_var : threaded_opr->const_vars) {
+    auto &sync_obj = read_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& writer : sync_obj.writer_event) {
+      if (writer.event.expired()) {
+        sync_obj.writer_event.clear();
+        break;
+      }
+      if (writer.stream != worker_stream->stream_) {
+        // if there is already a reader on the same stream as us,
+        // it already synced with that writer and we can rely on
+        // the ongoing sync
+        bool found = false;
+        for (const auto& reader : reader_events) {
+          if (reader.stream == worker_stream->stream_) {
+            found = true;
+            break;
+          }
+        }
+        if (!found) {
+          AddEventHelper(&event_per_stream,
+                          writer);
+        }
+      }
+    }
+  }
+  for (auto* write_var : threaded_opr->mutable_vars) {
+    auto &sync_obj = write_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    // check for expired events and delete them
+    auto &reader_events = sync_obj.reader_events;
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    // if there are some readers, we wait for them
+    for (auto& cuda_event : reader_events) {
+      if (worker_stream->stream_ != cuda_event.stream) {
+        AddEventHelper(&event_per_stream, cuda_event);
+      }
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();

Review comment:
       Could you elaborate why should all the writer events be removed when the first writer event expires. 

##########
File path: src/engine/threaded_engine.cc
##########
@@ -523,5 +543,206 @@ void ThreadedEngine::OnCompleteStatic(Engine *engine, void *opr_block_,
   OprBlock::Delete(opr_block);
 }
 
+void ThreadedEngine::OnStartStatic(Engine *engine, void *opr_block,
+                                   const dmlc::Error* error) {
+  // no-op
+}
+
+#if MXNET_USE_CUDA
+static inline void AddEventHelper(
+  std::unordered_map<cudaStream_t, EventInfo>* events_per_stream,
+  const EventInfo& cuda_event) {
+  auto event_stream = cuda_event.stream;
+  if (events_per_stream->count(event_stream) > 0) {
+    if ((*events_per_stream)[event_stream].pool_index < cuda_event.pool_index) {
+      (*events_per_stream)[event_stream] = cuda_event;
+    }
+  } else {
+    (*events_per_stream).emplace(event_stream, cuda_event);
+  }
+}
+
+void ThreadedEngine::OnStartCPU(Engine *engine, void *opr_block,
+                        const dmlc::Error* error) {
+  static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false);
+  if (!use_new_dep_engine) {
+    return;
+  }
+  ThreadedOpr *threaded_opr = static_cast<OprBlock*>(opr_block)->opr;
+  std::unordered_map<cudaStream_t, EventInfo> event_per_stream;
+  for (auto* read_var : threaded_opr->const_vars) {
+    auto &sync_obj = read_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& cuda_event : reader_events) {
+      AddEventHelper(&event_per_stream, cuda_event);
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();
+      } else {
+        AddEventHelper(&event_per_stream, sync_obj.writer_event[0]);
+      }
+    }
+  }
+
+  for (auto* write_var : threaded_opr->mutable_vars) {
+    auto &sync_obj = write_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& cuda_event : reader_events) {
+        AddEventHelper(&event_per_stream, cuda_event);
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();
+      } else {
+        AddEventHelper(&event_per_stream, sync_obj.writer_event[0]);
+      }
+    }
+  }
+  for (auto event : event_per_stream) {
+    auto ev = event.second.event.lock();
+    MSHADOW_CUDA_CALL(cudaEventSynchronize(*ev));
+  }
+}
+
+void ThreadedEngine::OnStartGPU(Engine *engine, void *sync_info,
+                        const dmlc::Error* error) {
+  static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false);
+  if (!use_new_dep_engine) {
+    return;
+  }
+  auto *info = reinterpret_cast<GPUWorkerSyncInfo *>(sync_info);
+  CHECK(info->stream != nullptr);
+  auto *worker_stream = reinterpret_cast<mshadow::Stream<gpu> *>(info->stream);
+  ThreadedOpr *threaded_opr = static_cast<OprBlock*>(info->opr_block)->opr;
+  std::unordered_map<cudaStream_t, EventInfo> event_per_stream;
+  for (auto* read_var : threaded_opr->const_vars) {
+    auto &sync_obj = read_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    auto &reader_events = sync_obj.reader_events;
+    // check for expired events and delete them
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    for (auto& writer : sync_obj.writer_event) {
+      if (writer.event.expired()) {
+        sync_obj.writer_event.clear();
+        break;
+      }
+      if (writer.stream != worker_stream->stream_) {
+        // if there is already a reader on the same stream as us,
+        // it already synced with that writer and we can rely on
+        // the ongoing sync
+        bool found = false;
+        for (const auto& reader : reader_events) {
+          if (reader.stream == worker_stream->stream_) {
+            found = true;
+            break;
+          }
+        }
+        if (!found) {
+          AddEventHelper(&event_per_stream,
+                          writer);
+        }
+      }
+    }
+  }
+  for (auto* write_var : threaded_opr->mutable_vars) {
+    auto &sync_obj = write_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    // check for expired events and delete them
+    auto &reader_events = sync_obj.reader_events;
+    reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(),
+                                       [&](const EventInfo e_i) {
+                                          return e_i.event.expired();
+                                       }), reader_events.end());
+    // if there are some readers, we wait for them
+    for (auto& cuda_event : reader_events) {
+      if (worker_stream->stream_ != cuda_event.stream) {
+        AddEventHelper(&event_per_stream, cuda_event);
+      }
+    }
+    if (!sync_obj.writer_event.empty()) {
+      if (sync_obj.writer_event[0].event.expired()) {
+        sync_obj.writer_event.clear();
+      } else {
+        if (worker_stream->stream_ != sync_obj.writer_event[0].stream) {
+          AddEventHelper(&event_per_stream, sync_obj.writer_event[0]);
+        }
+      }
+    }
+  }
+  for (auto event : event_per_stream) {
+    auto ev = event.second.event.lock();
+    MSHADOW_CUDA_CALL(cudaStreamWaitEvent(worker_stream->stream_, *ev, 0));
+  }
+}
+
+void ThreadedEngine::OnCompleteGPU(Engine *engine, void *sync_info,
+                          const dmlc::Error* error) {
+  auto *info = reinterpret_cast<GPUWorkerSyncInfo *>(sync_info);
+  CHECK(info->stream != nullptr);
+
+  auto *worker_stream = reinterpret_cast<mshadow::Stream<gpu> *>(info->stream);
+  static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false);
+
+  if (!use_new_dep_engine) {
+    worker_stream->Wait();
+    ThreadedEngine::OnCompleteStatic(engine, info->opr_block, error);
+    GPUWorkerSyncInfo::Delete(info);
+    return;
+  }
+
+  ThreadedOpr *threaded_opr = static_cast<OprBlock*>(info->opr_block)->opr;
+  auto* event_pool = static_cast<CUDAEventPool*>(info->event_pool);
+  auto[event, event_pool_idx] = event_pool->GetNextEvent();
+  auto ev = event.lock();
+  MSHADOW_CUDA_CALL(cudaEventRecord(*ev, worker_stream->stream_));
+  for (auto* read_var : threaded_opr->const_vars) {
+    auto &sync_obj = read_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    // If some reader event is already recorded on the same stream,
+    // we want to replace ourselves by it
+    int i;
+    for (i = 0; i < sync_obj.reader_events.size(); ++i) {
+      auto stream = sync_obj.reader_events[i].stream;
+      if (stream == worker_stream->stream_) {
+        sync_obj.reader_events[i].event = event;
+        sync_obj.reader_events[i].pool_index = event_pool_idx;
+        break;
+      }
+    }
+    if (i == sync_obj.reader_events.size()) {
+      sync_obj.reader_events.push_back({event, worker_stream->stream_, event_pool_idx});
+    }
+  }
+
+  for (auto* write_var : threaded_opr->mutable_vars) {
+    auto &sync_obj = write_var->sync_object;
+    std::lock_guard<std::mutex> l(sync_obj.mutex);
+    sync_obj.reader_events.clear();
+    sync_obj.writer_event.clear();
+    sync_obj.writer_event.push_back({event, worker_stream->stream_, event_pool_idx});

Review comment:
       This probably answers my last question. But what's the purpose of clearing all the reader and writer events? Is it because current event should synchronize with the cleared reader and writer events? 

##########
File path: src/engine/threaded_engine.cc
##########
@@ -523,5 +543,206 @@ void ThreadedEngine::OnCompleteStatic(Engine *engine, void *opr_block_,
   OprBlock::Delete(opr_block);
 }
 
+void ThreadedEngine::OnStartStatic(Engine *engine, void *opr_block,
+                                   const dmlc::Error* error) {
+  // no-op
+}
+
+#if MXNET_USE_CUDA
+static inline void AddEventHelper(
+  std::unordered_map<cudaStream_t, EventInfo>* events_per_stream,
+  const EventInfo& cuda_event) {
+  auto event_stream = cuda_event.stream;
+  if (events_per_stream->count(event_stream) > 0) {
+    if ((*events_per_stream)[event_stream].pool_index < cuda_event.pool_index) {
+      (*events_per_stream)[event_stream] = cuda_event;

Review comment:
       Why should the event be replaced by others with larger pool_index? 

##########
File path: src/engine/threaded_engine_perdevice.cc
##########
@@ -103,7 +118,11 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
         MSHADOW_CATCH_ERROR(mshadow::SetDevice<gpu>(ctx.dev_id));
         #endif
       }
-      this->ExecuteOprBlock(RunContext{ctx, nullptr, nullptr, false}, opr_block);
+      CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartStatic,
+                                                     opr_block);
+      CallbackOnComplete callback = this->CreateCallback(ThreadedEngine::OnCompleteStatic,
+                                                         opr_block);

Review comment:
       Why use OnStartStatic and OnCompleteStatic here instead of creating GPU onstart and oncomplete for GPU case, creating static-onstart and static-oncomplete for CPU case. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@mxnet.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org