You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by zh...@apache.org on 2019/02/13 21:36:53 UTC

[incubator-mxnet] branch master updated: Add pin_device_id option to Gluon DataLoader (#14136)

This is an automated email from the ASF dual-hosted git repository.

zhreshold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b1761f  Add pin_device_id option to Gluon DataLoader (#14136)
0b1761f is described below

commit 0b1761ff118e4724a9d934aa018de90acadda17f
Author: Yuxi Hu <da...@gmail.com>
AuthorDate: Wed Feb 13 13:36:28 2019 -0800

    Add pin_device_id option to Gluon DataLoader (#14136)
    
    * add pin_device_id option to DataLoader
    
    * add unit test to check output context
    
    * trigger CI
---
 python/mxnet/gluon/data/dataloader.py    | 37 +++++++++++++++++++++-----------
 tests/python/unittest/test_gluon_data.py | 24 +++++++++++++++++++++
 2 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py
index 9d76274..934f2d5 100644
--- a/python/mxnet/gluon/data/dataloader.py
+++ b/python/mxnet/gluon/data/dataloader.py
@@ -169,14 +169,15 @@ def worker_loop_v1(dataset, key_queue, data_queue, batchify_fn):
         batch = batchify_fn([dataset[i] for i in samples])
         data_queue.put((idx, batch))
 
-def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False, data_buffer_lock=None):
+def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False,
+                    pin_device_id=0, data_buffer_lock=None):
     """Fetcher loop for fetching data from queue and put in reorder dict."""
     while True:
         idx, batch = data_queue.get()
         if idx is None:
             break
         if pin_memory:
-            batch = _as_in_context(batch, context.cpu_pinned())
+            batch = _as_in_context(batch, context.cpu_pinned(pin_device_id))
         else:
             batch = _as_in_context(batch, context.cpu())
         if data_buffer_lock is not None:
@@ -188,8 +189,8 @@ def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False, data_buffer_lock=
 
 class _MultiWorkerIterV1(object):
     """Internal multi-worker iterator for DataLoader."""
-    def __init__(self, num_workers, dataset, batchify_fn, batch_sampler, pin_memory=False,
-                 worker_fn=worker_loop_v1):
+    def __init__(self, num_workers, dataset, batchify_fn, batch_sampler,
+                 pin_memory=False, pin_device_id=0, worker_fn=worker_loop_v1):
         assert num_workers > 0, "_MultiWorkerIter is not for {} workers".format(num_workers)
         self._num_workers = num_workers
         self._dataset = dataset
@@ -218,7 +219,8 @@ class _MultiWorkerIterV1(object):
 
         self._fetcher = threading.Thread(
             target=fetcher_loop_v1,
-            args=(self._data_queue, self._data_buffer, pin_memory, self._data_buffer_lock))
+            args=(self._data_queue, self._data_buffer, pin_memory,
+                  pin_device_id, self._data_buffer_lock))
         self._fetcher.daemon = True
         self._fetcher.start()
 
@@ -323,12 +325,15 @@ class DataLoaderV1(object):
         If ``True``, the dataloader will copy NDArrays into pinned memory
         before returning them. Copying from CPU pinned memory to GPU is faster
         than from normal CPU memory.
+    pin_device_id : int, default 0
+        The device id to use for allocating pinned memory if pin_memory is ``True``
     """
     def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
                  last_batch=None, batch_sampler=None, batchify_fn=None,
-                 num_workers=0, pin_memory=False):
+                 num_workers=0, pin_memory=False, pin_device_id=0):
         self._dataset = dataset
         self._pin_memory = pin_memory
+        self._pin_device_id = pin_device_id
 
         if batch_sampler is None:
             if batch_size is None:
@@ -365,13 +370,14 @@ class DataLoaderV1(object):
                 for batch in self._batch_sampler:
                     ret = self._batchify_fn([self._dataset[idx] for idx in batch])
                     if self._pin_memory:
-                        ret = _as_in_context(ret, context.cpu_pinned())
+                        ret = _as_in_context(ret, context.cpu_pinned(self._pin_device_id))
                     yield ret
             return same_process_iter()
 
         # multi-worker
         return _MultiWorkerIterV1(self._num_workers, self._dataset,
-                                  self._batchify_fn, self._batch_sampler, self._pin_memory)
+                                  self._batchify_fn, self._batch_sampler,
+                                  self._pin_memory, self._pin_device_id)
 
     def __len__(self):
         return len(self._batch_sampler)
@@ -403,7 +409,7 @@ def _thread_worker_fn(samples, batchify_fn, dataset):
 class _MultiWorkerIter(object):
     """Internal multi-worker iterator for DataLoader."""
     def __init__(self, worker_pool, batchify_fn, batch_sampler, pin_memory=False,
-                 worker_fn=_worker_fn, prefetch=0, dataset=None):
+                 pin_device_id=0, worker_fn=_worker_fn, prefetch=0, dataset=None):
         self._worker_pool = worker_pool
         self._batchify_fn = batchify_fn
         self._batch_sampler = batch_sampler
@@ -413,6 +419,7 @@ class _MultiWorkerIter(object):
         self._iter = iter(self._batch_sampler)
         self._worker_fn = worker_fn
         self._pin_memory = pin_memory
+        self._pin_device_id = pin_device_id
         self._dataset = dataset
         # pre-fetch
         for _ in range(prefetch):
@@ -442,7 +449,7 @@ class _MultiWorkerIter(object):
         ret = self._data_buffer.pop(self._rcvd_idx)
         batch = pickle.loads(ret.get()) if self._dataset is None else ret.get()
         if self._pin_memory:
-            batch = _as_in_context(batch, context.cpu_pinned())
+            batch = _as_in_context(batch, context.cpu_pinned(self._pin_device_id))
         batch = batch[0] if len(batch) == 1 else batch
         self._rcvd_idx += 1
         return batch
@@ -498,6 +505,8 @@ class DataLoader(object):
         If ``True``, the dataloader will copy NDArrays into pinned memory
         before returning them. Copying from CPU pinned memory to GPU is faster
         than from normal CPU memory.
+    pin_device_id : int, default 0
+        The device id to use for allocating pinned memory if pin_memory is ``True``
     prefetch : int, default is `num_workers * 2`
         The number of prefetching batches only works if `num_workers` > 0.
         If `prefetch` > 0, it allow worker process to prefetch certain batches before
@@ -514,9 +523,11 @@ class DataLoader(object):
     """
     def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
                  last_batch=None, batch_sampler=None, batchify_fn=None,
-                 num_workers=0, pin_memory=False, prefetch=None, thread_pool=False):
+                 num_workers=0, pin_memory=False, pin_device_id=0,
+                 prefetch=None, thread_pool=False):
         self._dataset = dataset
         self._pin_memory = pin_memory
+        self._pin_device_id = pin_device_id
         self._thread_pool = thread_pool
 
         if batch_sampler is None:
@@ -562,13 +573,13 @@ class DataLoader(object):
                 for batch in self._batch_sampler:
                     ret = self._batchify_fn([self._dataset[idx] for idx in batch])
                     if self._pin_memory:
-                        ret = _as_in_context(ret, context.cpu_pinned())
+                        ret = _as_in_context(ret, context.cpu_pinned(self._pin_device_id))
                     yield ret
             return same_process_iter()
 
         # multi-worker
         return _MultiWorkerIter(self._worker_pool, self._batchify_fn, self._batch_sampler,
-                                pin_memory=self._pin_memory,
+                                pin_memory=self._pin_memory, pin_device_id=self._pin_device_id,
                                 worker_fn=_thread_worker_fn if self._thread_pool else _worker_fn,
                                 prefetch=self._prefetch,
                                 dataset=self._dataset if self._thread_pool else None)
diff --git a/tests/python/unittest/test_gluon_data.py b/tests/python/unittest/test_gluon_data.py
index 353a819..1939de8 100644
--- a/tests/python/unittest/test_gluon_data.py
+++ b/tests/python/unittest/test_gluon_data.py
@@ -256,6 +256,30 @@ def test_multi_worker_dataloader_release_pool():
         del the_iter
         del D
 
+
+def test_dataloader_context():
+    X = np.random.uniform(size=(10, 20))
+    dataset = gluon.data.ArrayDataset(X)
+    default_dev_id = 0
+    custom_dev_id = 1
+
+    # use non-pinned memory
+    loader1 = gluon.data.DataLoader(dataset, 8)
+    for _, x in enumerate(loader1):
+        assert x.context == context.cpu(default_dev_id)
+
+    # use pinned memory with default device id
+    loader2 = gluon.data.DataLoader(dataset, 8, pin_memory=True)
+    for _, x in enumerate(loader2):
+        assert x.context == context.cpu_pinned(default_dev_id)
+
+    # use pinned memory with custom device id
+    loader3 = gluon.data.DataLoader(dataset, 8, pin_memory=True,
+                                    pin_device_id=custom_dev_id)
+    for _, x in enumerate(loader3):
+        assert x.context == context.cpu_pinned(custom_dev_id)
+
+
 if __name__ == '__main__':
     import nose
     nose.runmodule()