You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by zh...@apache.org on 2018/05/09 04:12:01 UTC

[incubator-mxnet] branch master updated: fix thread contention caused by openmp (#10820)

This is an automated email from the ASF dual-hosted git repository.

zhasheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d5560d  fix thread contention caused by openmp (#10820)
0d5560d is described below

commit 0d5560df88629e790f63463c63018ed61bee4030
Author: Joshua Z. Zhang <ch...@gmail.com>
AuthorDate: Tue May 8 21:11:53 2018 -0700

    fix thread contention caused by openmp (#10820)
    
    * fix thread contention caused by openmp
    
    * fix namespace
    
    * fix relative include
---
 python/mxnet/gluon/data/dataloader.py   | 19 +++++++++++++++++--
 src/engine/threaded_engine_perdevice.cc | 16 ----------------
 src/initialize.cc                       | 19 +++++++++++++++++++
 3 files changed, 36 insertions(+), 18 deletions(-)

diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py
index 7f09e28..7ef18bd 100644
--- a/python/mxnet/gluon/data/dataloader.py
+++ b/python/mxnet/gluon/data/dataloader.py
@@ -83,6 +83,21 @@ class Queue(multiprocessing.queues.Queue):
         self._recv = self._reader.recv
 
 
+class SimpleQueue(multiprocessing.queues.SimpleQueue):
+    """Wrapper for multiprocessing SimpleQueue that dumps NDArray with shared memory.
+       SimpleQueue don't use threading internally.
+    """
+    def __init__(self, *args, **kwargs):
+        if sys.version_info[0] <= 2:
+            super(SimpleQueue, self).__init__(*args, **kwargs)
+        else:
+            super(SimpleQueue, self).__init__(*args, ctx=multiprocessing.get_context(),
+                                              **kwargs)
+        self._reader = ConnectionWrapper(self._reader)
+        self._writer = ConnectionWrapper(self._writer)
+        self._send = self._writer.send
+        self._recv = self._reader.recv
+
 def default_batchify_fn(data):
     """Collate data into batch."""
     if isinstance(data[0], nd.NDArray):
@@ -128,7 +143,7 @@ class _MultiWorkerIter(object):
         self._batchify_fn = batchify_fn
         self._batch_sampler = batch_sampler
         self._key_queue = Queue()
-        self._data_queue = Queue(2*self._num_workers)
+        self._data_queue = SimpleQueue()
         self._data_buffer = {}
         self._rcvd_idx = 0
         self._sent_idx = 0
@@ -170,10 +185,10 @@ class _MultiWorkerIter(object):
             raise StopIteration
 
         while True:
+            self._push_next()
             if self._rcvd_idx in self._data_buffer:
                 batch = self._data_buffer.pop(self._rcvd_idx)
                 self._rcvd_idx += 1
-                self._push_next()
                 return batch
             idx, batch = self._data_queue.get()
             self._data_buffer[idx] = batch
diff --git a/src/engine/threaded_engine_perdevice.cc b/src/engine/threaded_engine_perdevice.cc
index a822788..2f77380 100644
--- a/src/engine/threaded_engine_perdevice.cc
+++ b/src/engine/threaded_engine_perdevice.cc
@@ -53,22 +53,6 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
 
   ThreadedEnginePerDevice() noexcept(false) {
     this->Start();
-#ifndef _WIN32
-    pthread_atfork(
-      []() {
-        Engine::Get()->Stop();
-      },
-      []() {
-        Engine::Get()->Start();
-      },
-      []() {
-        // Make children single threaded since they are typically workers
-        dmlc::SetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
-        dmlc::SetEnv("OMP_NUM_THREADS", 1);
-        OpenMP::Get()->set_enabled(false);
-        Engine::Get()->Start();
-      });
-#endif
   }
   ~ThreadedEnginePerDevice() noexcept(false) {
     this->StopNoWait();
diff --git a/src/initialize.cc b/src/initialize.cc
index 69d408d..1fd9262 100644
--- a/src/initialize.cc
+++ b/src/initialize.cc
@@ -25,6 +25,7 @@
 #include <signal.h>
 #include <dmlc/logging.h>
 #include <mxnet/engine.h>
+#include "./engine/openmp.h"
 
 namespace mxnet {
 #if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
@@ -42,6 +43,24 @@ class LibraryInitializer {
 #if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
     signal(SIGSEGV, SegfaultLogger);
 #endif
+
+// disable openmp for multithreaded workers
+#ifndef _WIN32
+    pthread_atfork(
+      []() {
+        Engine::Get()->Stop();
+      },
+      []() {
+        Engine::Get()->Start();
+      },
+      []() {
+        // Make children single threaded since they are typically workers
+        dmlc::SetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
+        dmlc::SetEnv("OMP_NUM_THREADS", 1);
+        engine::OpenMP::Get()->set_enabled(false);
+        Engine::Get()->Start();
+      });
+#endif
   }
 
   static LibraryInitializer* Get();

-- 
To stop receiving notification emails like this one, please contact
zhasheng@apache.org.