You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by jx...@apache.org on 2017/11/22 01:15:17 UTC
[incubator-mxnet] branch master updated: a user friendly way to use
g2c in module and an example of g2c (#8632)
This is an automated email from the ASF dual-hosted git repository.
jxie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new ec6144f a user friendly way to use g2c in module and an example of g2c (#8632)
ec6144f is described below
commit ec6144f87bbbb30620300d457101ab847ff127c9
Author: Ziyue Huang <zy...@gmail.com>
AuthorDate: Wed Nov 22 09:15:14 2017 +0800
a user friendly way to use g2c in module and an example of g2c (#8632)
* a user friendly way to use g2c in module
* also support g2c to be list
* update
* update test
* g2c example
* Update matrix_factorization_model_parallel.py
* address comments
* update
* update
* remove fc
* debug g2c
* Revert "debug g2c"
This reverts commit caabdc5c5fa8618d3ed4db2cbad4e807b63c211e.
* update
* move g2c example to another folder
* update
* readme
---
.../lstm}/README.md | 0
.../lstm}/get_ptb_data.sh | 0
.../lstm}/lstm.py | 0
.../lstm}/lstm_ptb.py | 0
.../matrix_factorization/get_data.py | 56 +++++++++++
.../matrix_fact_parallel_model.py | 56 +++++++++++
.../matrix_factorization_model_parallel.py | 106 +++++++++++++++++++++
.../model-parallel/matrix_factorization/readme.md | 6 ++
python/mxnet/module/bucketing_module.py | 3 +-
python/mxnet/module/executor_group.py | 37 ++++++-
python/mxnet/module/module.py | 3 +-
tests/python/unittest/test_module.py | 61 +++++++-----
12 files changed, 295 insertions(+), 33 deletions(-)
diff --git a/example/model-parallel-lstm/README.md b/example/model-parallel/lstm/README.md
similarity index 100%
rename from example/model-parallel-lstm/README.md
rename to example/model-parallel/lstm/README.md
diff --git a/example/model-parallel-lstm/get_ptb_data.sh b/example/model-parallel/lstm/get_ptb_data.sh
similarity index 100%
rename from example/model-parallel-lstm/get_ptb_data.sh
rename to example/model-parallel/lstm/get_ptb_data.sh
diff --git a/example/model-parallel-lstm/lstm.py b/example/model-parallel/lstm/lstm.py
similarity index 100%
rename from example/model-parallel-lstm/lstm.py
rename to example/model-parallel/lstm/lstm.py
diff --git a/example/model-parallel-lstm/lstm_ptb.py b/example/model-parallel/lstm/lstm_ptb.py
similarity index 100%
rename from example/model-parallel-lstm/lstm_ptb.py
rename to example/model-parallel/lstm/lstm_ptb.py
diff --git a/example/model-parallel/matrix_factorization/get_data.py b/example/model-parallel/matrix_factorization/get_data.py
new file mode 100644
index 0000000..bb2503a
--- /dev/null
+++ b/example/model-parallel/matrix_factorization/get_data.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import mxnet as mx
+
+
+def get_movielens_data(prefix):
+ if not os.path.exists("%s.zip" % prefix):
+ print("Dataset MovieLens 10M not present. Downloading now ...")
+ os.system("wget http://files.grouplens.org/datasets/movielens/%s.zip" % prefix)
+ os.system("unzip %s.zip" % prefix)
+ os.system("cd ml-10M100K; sh split_ratings.sh; cd -;")
+
+def get_movielens_iter(filename, batch_size):
+ """Not particularly fast code to parse the text file and load into NDArrays.
+ return two data iters, one for train, the other for validation.
+ """
+ print("Preparing data iterators for " + filename + " ... ")
+ user = []
+ item = []
+ score = []
+ with open(filename, 'r') as f:
+ num_samples = 0
+ for line in f:
+ tks = line.strip().split('::')
+ if len(tks) != 4:
+ continue
+ num_samples += 1
+ user.append((tks[0]))
+ item.append((tks[1]))
+ score.append((tks[2]))
+ # convert to ndarrays
+ user = mx.nd.array(user, dtype='int32')
+ item = mx.nd.array(item)
+ score = mx.nd.array(score)
+ # prepare data iters
+ data_train = {'user':user, 'item':item}
+ label_train = {'score':score}
+ iter_train = mx.io.NDArrayIter(data=data_train,label=label_train,
+ batch_size=batch_size, shuffle=True)
+ return mx.io.PrefetchingIter(iter_train)
diff --git a/example/model-parallel/matrix_factorization/matrix_fact_parallel_model.py b/example/model-parallel/matrix_factorization/matrix_fact_parallel_model.py
new file mode 100644
index 0000000..f4004d1
--- /dev/null
+++ b/example/model-parallel/matrix_factorization/matrix_fact_parallel_model.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import mxnet as mx
+
+def matrix_fact_model_parallel_net(factor_size, num_hidden, max_user, max_item):
+ # set ctx_group attribute to 'dev1' for the symbols created in this scope,
+ # the symbols will be bound to the context that 'dev1' map to in group2ctxs
+ with mx.AttrScope(ctx_group='dev1'):
+ # input
+ user = mx.symbol.Variable('user')
+ item = mx.symbol.Variable('item')
+ # user feature lookup
+ user_weight = mx.symbol.Variable('user_weight')
+ user = mx.symbol.Embedding(data=user, weight=user_weight,
+ input_dim=max_user, output_dim=factor_size)
+ # item feature lookup
+ item_weight = mx.symbol.Variable('item_weight')
+ item = mx.symbol.Embedding(data=item, weight=item_weight,
+ input_dim=max_item, output_dim=factor_size)
+ # set ctx_group attribute to 'dev2' for the symbols created in this scope,
+ # the symbols will be bound to the context that 'dev2' map to in group2ctxs
+ with mx.AttrScope(ctx_group='dev2'):
+ # non-linear transformation of user features
+ user = mx.symbol.Activation(data=user, act_type='relu')
+ fc_user_weight = mx.symbol.Variable('fc_user_weight')
+ fc_user_bias = mx.symbol.Variable('fc_user_bias')
+ user = mx.symbol.FullyConnected(data=user, weight=fc_user_weight, bias=fc_user_bias, num_hidden=num_hidden)
+ # non-linear transformation of user features
+ item = mx.symbol.Activation(data=item, act_type='relu')
+ fc_item_weight = mx.symbol.Variable('fc_item_weight')
+ fc_item_bias = mx.symbol.Variable('fc_item_bias')
+ item = mx.symbol.FullyConnected(data=item, weight=fc_item_weight, bias=fc_item_bias, num_hidden=num_hidden)
+ # predict by the inner product, which is elementwise product and then sum
+ pred = user * item
+ pred = mx.symbol.sum(data=pred, axis=1)
+ pred = mx.symbol.Flatten(data=pred)
+ # label
+ score = mx.symbol.Variable('score')
+ # loss layer
+ pred = mx.symbol.LinearRegressionOutput(data=pred, label=score)
+ return pred
diff --git a/example/model-parallel/matrix_factorization/matrix_factorization_model_parallel.py b/example/model-parallel/matrix_factorization/matrix_factorization_model_parallel.py
new file mode 100644
index 0000000..ab607f1
--- /dev/null
+++ b/example/model-parallel/matrix_factorization/matrix_factorization_model_parallel.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import logging
+import time
+import mxnet as mx
+import numpy as np
+from get_data import get_movielens_iter, get_movielens_data
+from matrix_fact_parallel_model import matrix_fact_model_parallel_net
+
+
+logging.basicConfig(level=logging.DEBUG)
+
+parser = argparse.ArgumentParser(description="Run model parallel version of matrix factorization",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+parser.add_argument('--num-epoch', type=int, default=3,
+ help='number of epochs to train')
+parser.add_argument('--batch-size', type=int, default=256,
+ help='number of examples per batch')
+parser.add_argument('--print-every', type=int, default=100,
+ help='logging interval')
+parser.add_argument('--factor-size', type=int, default=128,
+ help="the factor size of the embedding operation")
+parser.add_argument('--num-gpus', type=int, default=2,
+ help="number of gpus to use")
+
+MOVIELENS = {
+ 'dataset': 'ml-10m',
+ 'train': './ml-10M100K/r1.train',
+ 'val': './ml-10M100K/r1.test',
+ 'max_user': 71569,
+ 'max_movie': 65135,
+}
+
+if __name__ == '__main__':
+ head = '%(asctime)-15s %(message)s'
+ logging.basicConfig(level=logging.INFO, format=head)
+
+ # arg parser
+ args = parser.parse_args()
+ logging.info(args)
+ num_epoch = args.num_epoch
+ batch_size = args.batch_size
+ optimizer = 'sgd'
+ factor_size = args.factor_size
+ print_every = args.print_every
+ num_gpus = args.num_gpus
+
+ momentum = 0.9
+ learning_rate = 0.1
+
+ # prepare dataset and iterators
+ max_user = MOVIELENS['max_user']
+ max_movies = MOVIELENS['max_movie']
+ get_movielens_data(MOVIELENS['dataset'])
+ train_iter = get_movielens_iter(MOVIELENS['train'], batch_size)
+ val_iter = get_movielens_iter(MOVIELENS['val'], batch_size)
+
+ # construct the model
+ net = matrix_fact_model_parallel_net(factor_size, factor_size, max_user, max_movies)
+
+ # construct the module
+ # map the ctx_group attribute to the context assignment
+ group2ctxs={'dev1':mx.cpu(), 'dev2':[mx.gpu(i) for i in range(num_gpus)]}
+ mod = mx.module.Module(symbol=net, context=[mx.cpu()]*num_gpus, data_names=['user', 'item'],
+ label_names=['score'], group2ctxs=group2ctxs)
+
+ # the initializer uesd to initialize the parameters
+ initializer = mx.init.Xavier(factor_type="in", magnitude=2.34)
+
+ # the parameters for the optimizer constructor
+ optimizer_params = {
+ 'learning_rate': learning_rate,
+ 'wd': 1e-4,
+ 'momentum': momentum,
+ 'rescale_grad': 1.0/batch_size}
+
+ # use MSE as the metric
+ metric = mx.metric.create(['MSE'])
+
+ speedometer = mx.callback.Speedometer(batch_size, print_every)
+
+ # start training
+ mod.fit(train_iter,
+ val_iter,
+ eval_metric = metric,
+ num_epoch = num_epoch,
+ optimizer = optimizer,
+ optimizer_params = optimizer_params,
+ initializer = initializer,
+ batch_end_callback = speedometer)
diff --git a/example/model-parallel/matrix_factorization/readme.md b/example/model-parallel/matrix_factorization/readme.md
new file mode 100644
index 0000000..5d724ae
--- /dev/null
+++ b/example/model-parallel/matrix_factorization/readme.md
@@ -0,0 +1,6 @@
+Model Parallel Matrix Factorization
+==============
+
+The example demonstrates the basic usage of `group2ctxs` in `Module`, which allows one part of the model trained on cpu and the other on gpu.
+
+- `python matrix_factorization_model_parallel.py --num-gpus 2`
diff --git a/python/mxnet/module/bucketing_module.py b/python/mxnet/module/bucketing_module.py
index 0bea260..d93ef3b 100644
--- a/python/mxnet/module/bucketing_module.py
+++ b/python/mxnet/module/bucketing_module.py
@@ -52,7 +52,8 @@ class BucketingModule(BaseModule):
state_names : list of str
States are similar to data and label, but not provided by data iterator.
Instead they are initialized to 0 and can be set by set_states()
- group2ctxs : list of dict of str to context
+ group2ctxs : dict of str to context or list of context,
+ or list of dict of str to context
Default is `None`. Mapping the `ctx_group` attribute to the context assignment.
compression_params : dict
Specifies type of gradient compression and additional arguments depending
diff --git a/python/mxnet/module/executor_group.py b/python/mxnet/module/executor_group.py
index ea7651b..2d680d4 100755
--- a/python/mxnet/module/executor_group.py
+++ b/python/mxnet/module/executor_group.py
@@ -95,6 +95,35 @@ def _merge_multi_context(outputs, major_axis):
rets.append(tensors[0])
return rets
+def _prepare_group2ctxs(group2ctxs, ctx_len):
+ """Prepare the group2contexts, will duplicate the context
+ if some ctx_group map to only one context.
+ """
+ if group2ctxs is None:
+ return [None] * ctx_len
+ elif isinstance(group2ctxs, list):
+ assert(len(group2ctxs) == ctx_len), "length of group2ctxs\
+ should be %d" % ctx_len
+ return group2ctxs
+ elif isinstance(group2ctxs, dict):
+ ret = [{}] * ctx_len
+ for k, v in group2ctxs.items():
+ ctxs = None
+ if isinstance(v, ctx.Context):
+ ctxs = [v] * ctx_len
+ else:
+ if len(v) == 1:
+ ctxs = v * ctx_len
+ else:
+ assert(len(v) == ctx_len), "length of group2ctxs[%s]\
+ should be %d or 1" % (k, ctx_len)
+ ctxs = v
+ for i in range(ctx_len):
+ ret[i][k] = ctxs[i]
+ return ret
+ else:
+ assert(False), "group2ctxs should be list of dict of str to context,\
+ or dict of str to context or list of context"
class DataParallelExecutorGroup(object):
"""A group of executors that lives on a group of devices.
@@ -139,7 +168,8 @@ class DataParallelExecutorGroup(object):
Requirement for gradient accumulation. Can be 'write', 'add', or 'null'
(default to 'write').
Can be specified globally (str) or for each argument (list, dict).
- group2ctxs : list of dict of str to context
+ group2ctxs : dict of str to context or list of context,
+ or list of dict of str to context
Default is `None`. Mapping the `ctx_group` attribute to the context assignment.
"""
def __init__(self, symbol, contexts, workload, data_shapes, label_shapes, param_names,
@@ -152,10 +182,7 @@ class DataParallelExecutorGroup(object):
self.symbol = symbol
self.contexts = contexts
self.workload = workload
- if group2ctxs is None:
- group2ctxs = [None] * len(self.contexts)
- assert len(group2ctxs) == len(self.contexts)
- self.group2ctxs = group2ctxs
+ self.group2ctxs = _prepare_group2ctxs(group2ctxs, len(contexts))
self.for_training = for_training
self.inputs_need_grad = inputs_need_grad
diff --git a/python/mxnet/module/module.py b/python/mxnet/module/module.py
index a9c6516..73895c1 100644
--- a/python/mxnet/module/module.py
+++ b/python/mxnet/module/module.py
@@ -59,7 +59,8 @@ class Module(BaseModule):
state_names : list of str
states are similar to data and label, but not provided by data iterator.
Instead they are initialized to 0 and can be set by `set_states()`.
- group2ctxs : list of dict of str to context
+ group2ctxs : dict of str to context or list of context,
+ or list of dict of str to context
Default is `None`. Mapping the `ctx_group` attribute to the context assignment.
compression_params : dict
Specifies type of gradient compression and additional arguments depending
diff --git a/tests/python/unittest/test_module.py b/tests/python/unittest/test_module.py
index a8fb99d..ed5fe26 100644
--- a/tests/python/unittest/test_module.py
+++ b/tests/python/unittest/test_module.py
@@ -71,31 +71,40 @@ def test_module_input_grads():
def test_module_ctx_group():
- with mx.AttrScope(ctx_group='dev1'):
- a = mx.symbol.Variable('a')
- a = a * 2
- with mx.AttrScope(ctx_group='dev2'):
- b = mx.symbol.Variable('b')
- c = a + b
- shape = (2, 5)
- mod1 = mx.mod.Module(c, context=[mx.cpu(0)], data_names=['a', 'b'], label_names=None,
- group2ctxs=[{'dev1':mx.cpu(1),'dev2':mx.cpu(2)}])
- mod1.bind(data_shapes=[['a', shape], ['b', shape]], inputs_need_grad=True)
- mod1.init_params()
- mod1.forward(data_batch=mx.io.DataBatch(data=[mx.nd.ones(shape), mx.nd.ones(shape)]), is_train=True)
- mod1.backward([mx.nd.ones(shape)])
- mod1_input_grads = mod1.get_input_grads()
-
- mod2 = mx.mod.Module(c, data_names=['a', 'b'], label_names=None)
- mod2.bind(data_shapes=[['a', shape], ['b', shape]], inputs_need_grad=True)
- mod2.init_params()
- mod2.forward(data_batch=mx.io.DataBatch(data=[mx.nd.ones(shape), mx.nd.ones(shape)]), is_train=True)
- mod2.backward([mx.nd.ones(shape)])
- mod2_input_grads = mod2.get_input_grads()
-
- assert np.all(mod1_input_grads[0].asnumpy() == mod2_input_grads[0].asnumpy())
- assert np.all(mod1_input_grads[1].asnumpy() == mod2_input_grads[1].asnumpy())
-
+ def check_module_ctx_group(ctxs, group2ctxs):
+ with mx.AttrScope(ctx_group='dev1'):
+ a = mx.symbol.Variable('a')
+ a = a * 2
+ with mx.AttrScope(ctx_group='dev2'):
+ b = mx.symbol.Variable('b')
+ c = a + b
+ shape = (2, 5)
+ mod1 = mx.mod.Module(c, context=ctxs, data_names=['a', 'b'], label_names=None,
+ group2ctxs=group2ctxs)
+ mod1.bind(data_shapes=[['a', shape], ['b', shape]], inputs_need_grad=True)
+ mod1.init_params()
+ mod1.forward(data_batch=mx.io.DataBatch(data=[mx.nd.ones(shape), mx.nd.ones(shape)]), is_train=True)
+ mod1.backward([mx.nd.ones(shape)])
+ mod1_input_grads = mod1.get_input_grads()
+
+ mod2 = mx.mod.Module(c, context=ctxs, data_names=['a', 'b'], label_names=None)
+ mod2.bind(data_shapes=[['a', shape], ['b', shape]], inputs_need_grad=True)
+ mod2.init_params()
+ mod2.forward(data_batch=mx.io.DataBatch(data=[mx.nd.ones(shape), mx.nd.ones(shape)]), is_train=True)
+ mod2.backward([mx.nd.ones(shape)])
+ mod2_input_grads = mod2.get_input_grads()
+
+ assert np.all(mod1_input_grads[0].asnumpy() == mod2_input_grads[0].asnumpy())
+ assert np.all(mod1_input_grads[1].asnumpy() == mod2_input_grads[1].asnumpy())
+
+ check_module_ctx_group([mx.cpu(0)], {'dev1': mx.cpu(1), 'dev2': mx.cpu(2)})
+ check_module_ctx_group([mx.cpu(0), mx.cpu(1)],
+ [{'dev1': mx.cpu(2), 'dev2': mx.cpu(3)}, {'dev1': mx.cpu(4), 'dev2': mx.cpu(5)}])
+ check_module_ctx_group([mx.cpu(0), mx.cpu(1)], {'dev1': mx.cpu(2), 'dev2': mx.cpu(3)})
+ check_module_ctx_group([mx.cpu(0), mx.cpu(1)], {'dev1': mx.cpu(2), 'dev2': [mx.cpu(3)]})
+ check_module_ctx_group([mx.cpu(0), mx.cpu(1)], {'dev1':mx.cpu(2), 'dev2':[mx.cpu(3), mx.cpu(3)]})
+ check_module_ctx_group([mx.cpu(0), mx.cpu(1)],
+ {'dev1':[mx.cpu(2), mx.cpu(2)], 'dev2':[mx.cpu(3), mx.cpu(3)]})
def test_bucket_module_ctx_group():
num_hidden = 10
@@ -121,7 +130,7 @@ def test_bucket_module_ctx_group():
return sym, ('data',), ('label',)
mod = mx.mod.BucketingModule(sym_gen=sym_gen, default_bucket_key=10, context=[mx.cpu(0)],
- group2ctxs=[{'dev1':mx.cpu(1), 'dev2':mx.cpu(2)}])
+ group2ctxs=[{'dev1': mx.cpu(1), 'dev2': mx.cpu(2)}])
mod.bind(data_shapes=[['data', (batch_size, num_hidden)]],
label_shapes=[['label', (batch_size,)]],
for_training=True, inputs_need_grad=True)
--
To stop receiving notification emails like this one, please contact
['"commits@mxnet.apache.org" <co...@mxnet.apache.org>'].