You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by zh...@apache.org on 2019/02/13 21:36:53 UTC
[incubator-mxnet] branch master updated: Add pin_device_id option
to Gluon DataLoader (#14136)
This is an automated email from the ASF dual-hosted git repository.
zhreshold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1761f Add pin_device_id option to Gluon DataLoader (#14136)
0b1761f is described below
commit 0b1761ff118e4724a9d934aa018de90acadda17f
Author: Yuxi Hu <da...@gmail.com>
AuthorDate: Wed Feb 13 13:36:28 2019 -0800
Add pin_device_id option to Gluon DataLoader (#14136)
* add pin_device_id option to DataLoader
* add unit test to check output context
* trigger CI
---
python/mxnet/gluon/data/dataloader.py | 37 +++++++++++++++++++++-----------
tests/python/unittest/test_gluon_data.py | 24 +++++++++++++++++++++
2 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py
index 9d76274..934f2d5 100644
--- a/python/mxnet/gluon/data/dataloader.py
+++ b/python/mxnet/gluon/data/dataloader.py
@@ -169,14 +169,15 @@ def worker_loop_v1(dataset, key_queue, data_queue, batchify_fn):
batch = batchify_fn([dataset[i] for i in samples])
data_queue.put((idx, batch))
-def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False, data_buffer_lock=None):
+def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False,
+ pin_device_id=0, data_buffer_lock=None):
"""Fetcher loop for fetching data from queue and put in reorder dict."""
while True:
idx, batch = data_queue.get()
if idx is None:
break
if pin_memory:
- batch = _as_in_context(batch, context.cpu_pinned())
+ batch = _as_in_context(batch, context.cpu_pinned(pin_device_id))
else:
batch = _as_in_context(batch, context.cpu())
if data_buffer_lock is not None:
@@ -188,8 +189,8 @@ def fetcher_loop_v1(data_queue, data_buffer, pin_memory=False, data_buffer_lock=
class _MultiWorkerIterV1(object):
"""Internal multi-worker iterator for DataLoader."""
- def __init__(self, num_workers, dataset, batchify_fn, batch_sampler, pin_memory=False,
- worker_fn=worker_loop_v1):
+ def __init__(self, num_workers, dataset, batchify_fn, batch_sampler,
+ pin_memory=False, pin_device_id=0, worker_fn=worker_loop_v1):
assert num_workers > 0, "_MultiWorkerIter is not for {} workers".format(num_workers)
self._num_workers = num_workers
self._dataset = dataset
@@ -218,7 +219,8 @@ class _MultiWorkerIterV1(object):
self._fetcher = threading.Thread(
target=fetcher_loop_v1,
- args=(self._data_queue, self._data_buffer, pin_memory, self._data_buffer_lock))
+ args=(self._data_queue, self._data_buffer, pin_memory,
+ pin_device_id, self._data_buffer_lock))
self._fetcher.daemon = True
self._fetcher.start()
@@ -323,12 +325,15 @@ class DataLoaderV1(object):
If ``True``, the dataloader will copy NDArrays into pinned memory
before returning them. Copying from CPU pinned memory to GPU is faster
than from normal CPU memory.
+ pin_device_id : int, default 0
+ The device id to use for allocating pinned memory if pin_memory is ``True``
"""
def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
last_batch=None, batch_sampler=None, batchify_fn=None,
- num_workers=0, pin_memory=False):
+ num_workers=0, pin_memory=False, pin_device_id=0):
self._dataset = dataset
self._pin_memory = pin_memory
+ self._pin_device_id = pin_device_id
if batch_sampler is None:
if batch_size is None:
@@ -365,13 +370,14 @@ class DataLoaderV1(object):
for batch in self._batch_sampler:
ret = self._batchify_fn([self._dataset[idx] for idx in batch])
if self._pin_memory:
- ret = _as_in_context(ret, context.cpu_pinned())
+ ret = _as_in_context(ret, context.cpu_pinned(self._pin_device_id))
yield ret
return same_process_iter()
# multi-worker
return _MultiWorkerIterV1(self._num_workers, self._dataset,
- self._batchify_fn, self._batch_sampler, self._pin_memory)
+ self._batchify_fn, self._batch_sampler,
+ self._pin_memory, self._pin_device_id)
def __len__(self):
return len(self._batch_sampler)
@@ -403,7 +409,7 @@ def _thread_worker_fn(samples, batchify_fn, dataset):
class _MultiWorkerIter(object):
"""Internal multi-worker iterator for DataLoader."""
def __init__(self, worker_pool, batchify_fn, batch_sampler, pin_memory=False,
- worker_fn=_worker_fn, prefetch=0, dataset=None):
+ pin_device_id=0, worker_fn=_worker_fn, prefetch=0, dataset=None):
self._worker_pool = worker_pool
self._batchify_fn = batchify_fn
self._batch_sampler = batch_sampler
@@ -413,6 +419,7 @@ class _MultiWorkerIter(object):
self._iter = iter(self._batch_sampler)
self._worker_fn = worker_fn
self._pin_memory = pin_memory
+ self._pin_device_id = pin_device_id
self._dataset = dataset
# pre-fetch
for _ in range(prefetch):
@@ -442,7 +449,7 @@ class _MultiWorkerIter(object):
ret = self._data_buffer.pop(self._rcvd_idx)
batch = pickle.loads(ret.get()) if self._dataset is None else ret.get()
if self._pin_memory:
- batch = _as_in_context(batch, context.cpu_pinned())
+ batch = _as_in_context(batch, context.cpu_pinned(self._pin_device_id))
batch = batch[0] if len(batch) == 1 else batch
self._rcvd_idx += 1
return batch
@@ -498,6 +505,8 @@ class DataLoader(object):
If ``True``, the dataloader will copy NDArrays into pinned memory
before returning them. Copying from CPU pinned memory to GPU is faster
than from normal CPU memory.
+ pin_device_id : int, default 0
+ The device id to use for allocating pinned memory if pin_memory is ``True``
prefetch : int, default is `num_workers * 2`
The number of prefetching batches only works if `num_workers` > 0.
If `prefetch` > 0, it allow worker process to prefetch certain batches before
@@ -514,9 +523,11 @@ class DataLoader(object):
"""
def __init__(self, dataset, batch_size=None, shuffle=False, sampler=None,
last_batch=None, batch_sampler=None, batchify_fn=None,
- num_workers=0, pin_memory=False, prefetch=None, thread_pool=False):
+ num_workers=0, pin_memory=False, pin_device_id=0,
+ prefetch=None, thread_pool=False):
self._dataset = dataset
self._pin_memory = pin_memory
+ self._pin_device_id = pin_device_id
self._thread_pool = thread_pool
if batch_sampler is None:
@@ -562,13 +573,13 @@ class DataLoader(object):
for batch in self._batch_sampler:
ret = self._batchify_fn([self._dataset[idx] for idx in batch])
if self._pin_memory:
- ret = _as_in_context(ret, context.cpu_pinned())
+ ret = _as_in_context(ret, context.cpu_pinned(self._pin_device_id))
yield ret
return same_process_iter()
# multi-worker
return _MultiWorkerIter(self._worker_pool, self._batchify_fn, self._batch_sampler,
- pin_memory=self._pin_memory,
+ pin_memory=self._pin_memory, pin_device_id=self._pin_device_id,
worker_fn=_thread_worker_fn if self._thread_pool else _worker_fn,
prefetch=self._prefetch,
dataset=self._dataset if self._thread_pool else None)
diff --git a/tests/python/unittest/test_gluon_data.py b/tests/python/unittest/test_gluon_data.py
index 353a819..1939de8 100644
--- a/tests/python/unittest/test_gluon_data.py
+++ b/tests/python/unittest/test_gluon_data.py
@@ -256,6 +256,30 @@ def test_multi_worker_dataloader_release_pool():
del the_iter
del D
+
+def test_dataloader_context():
+ X = np.random.uniform(size=(10, 20))
+ dataset = gluon.data.ArrayDataset(X)
+ default_dev_id = 0
+ custom_dev_id = 1
+
+ # use non-pinned memory
+ loader1 = gluon.data.DataLoader(dataset, 8)
+ for _, x in enumerate(loader1):
+ assert x.context == context.cpu(default_dev_id)
+
+ # use pinned memory with default device id
+ loader2 = gluon.data.DataLoader(dataset, 8, pin_memory=True)
+ for _, x in enumerate(loader2):
+ assert x.context == context.cpu_pinned(default_dev_id)
+
+ # use pinned memory with custom device id
+ loader3 = gluon.data.DataLoader(dataset, 8, pin_memory=True,
+ pin_device_id=custom_dev_id)
+ for _, x in enumerate(loader3):
+ assert x.context == context.cpu_pinned(custom_dev_id)
+
+
if __name__ == '__main__':
import nose
nose.runmodule()