You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by qk...@apache.org on 2017/08/02 21:51:36 UTC
[incubator-mxnet] branch master updated: [R] RNN bucketing with
multiple devices. (#7315)
This is an automated email from the ASF dual-hosted git repository.
qkou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git
The following commit(s) were added to refs/heads/master by this push:
new 5976973 [R] RNN bucketing with multiple devices. (#7315)
5976973 is described below
commit 59769736a402b834b5d53bcdaae895ac4a069e12
Author: Qiang Kou (KK) <qk...@qkou.info>
AuthorDate: Wed Aug 2 21:51:34 2017 +0000
[R] RNN bucketing with multiple devices. (#7315)
---
example/rnn/bucket_R/aclImdb_lstm_classification.R | 55 +++-----
example/rnn/bucket_R/mx.io.bucket.iter.R | 6 +-
example/rnn/bucket_R/rnn.R | 44 ++++---
example/rnn/bucket_R/rnn.train.R | 140 +++++++++++++++++----
4 files changed, 157 insertions(+), 88 deletions(-)
diff --git a/example/rnn/bucket_R/aclImdb_lstm_classification.R b/example/rnn/bucket_R/aclImdb_lstm_classification.R
index aaa6d38..bb5eaac 100644
--- a/example/rnn/bucket_R/aclImdb_lstm_classification.R
+++ b/example/rnn/bucket_R/aclImdb_lstm_classification.R
@@ -11,51 +11,30 @@ vocab <- length(corpus_bucketed_test$dic)
### Create iterators
batch.size <- 64
-train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets,
- batch.size = batch.size,
- data.mask.element = 0,
- shuffle = TRUE)
+num.round <- 16
-eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets,
- batch.size = batch.size,
- data.mask.element = 0,
- shuffle = FALSE)
+train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets, batch.size = batch.size,
+ data.mask.element = 0, shuffle = TRUE)
+
+eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets, batch.size = batch.size,
+ data.mask.element = 0, shuffle = FALSE)
mx.set.seed(0)
+optimizer <- mx.opt.create("adadelta", rho = 0.92, epsilon = 1e-06, wd = 2e-04, clip_gradient = NULL,
+ rescale.grad = 1/batch.size)
+
+model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data, begin.round = 1,
+ num.round = num.round, ctx = mx.cpu(), metric = mx.metric.accuracy, optimizer = optimizer,
+ num.rnn.layer = 2, num.embed = 16, num.hidden = 24, num.label = 2, input.size = vocab,
+ initializer = mx.init.Xavier(rnd_type = "gaussian", factor_type = "in", magnitude = 2),
+ dropout = 0.25, config = "seq-to-one", batch.end.callback = mx.callback.log.train.metric(period = 50),
+ verbose = TRUE)
-end.round <- 16
-
-optimizer <- mx.opt.create("adadelta",
- rho = 0.92,
- epsilon = 1e-06,
- wd = 2e-04,
- clip_gradient = NULL,
- rescale.grad = 1/batch.size)
-
-model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data,
- begin.round = 1,
- end.round = end.round,
- ctx = mx.cpu(),
- metric = mx.metric.accuracy,
- optimizer = optimizer,
- num.rnn.layer = 2,
- num.embed = 16,
- num.hidden = 24,
- num.label = 2,
- input.size = vocab,
- initializer = mx.init.Xavier(rnd_type = "gaussian",
- factor_type = "in",
- magnitude = 2),
- dropout = 0.25,
- config = "seq-to-one",
- batch.end.callback = mx.callback.log.train.metric(period = 50),
- verbose = TRUE)
-
-mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = end.round)
+mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = num.round)
source("rnn.infer.R")
-model <- mx.model.load("model_sentiment_lstm", iteration = end.round)
+model <- mx.model.load("model_sentiment_lstm", iteration = num.round)
pred <- mx.rnn.infer.buckets(infer_iter = eval.data, model, "seq-to-one", ctx = mx.cpu())
diff --git a/example/rnn/bucket_R/mx.io.bucket.iter.R b/example/rnn/bucket_R/mx.io.bucket.iter.R
index 887247a..61f8795 100644
--- a/example/rnn/bucket_R/mx.io.bucket.iter.R
+++ b/example/rnn/bucket_R/mx.io.bucket.iter.R
@@ -64,16 +64,14 @@ BucketIter <- setRefClass("BucketIter", fields = c("buckets", "bucket.names", "b
# to appropriate sequence length)
idx <- (.self$bucketID - 1) * (.self$batch.size) + (1:batch.size)
data <- .self$buckets[[names(.self$bucketID)]]$data[, idx, drop = F]
- data_mask <- as.integer(names(.self$bucketID)) - apply(data == .self$data.mask.element,
- 2, sum)
data_mask_array <- (!data == 0)
if (length(dim(.self$buckets[[names(.self$bucketID)]]$label)) == 0) {
label <- .self$buckets[[names(.self$bucketID)]]$label[idx]
} else {
label <- .self$buckets[[names(.self$bucketID)]]$label[, idx, drop = F]
}
- return(list(data = mx.nd.array(data), label = mx.nd.array(label), data.mask = mx.nd.array(data_mask),
- data.mask.array = mx.nd.array(data_mask_array)))
+ return(list(data = mx.nd.array(data), data.mask.array = mx.nd.array(data_mask_array),
+ label = mx.nd.array(label)))
}, finalize = function() {
}))
diff --git a/example/rnn/bucket_R/rnn.R b/example/rnn/bucket_R/rnn.R
index f55272f..ea02b95 100644
--- a/example/rnn/bucket_R/rnn.R
+++ b/example/rnn/bucket_R/rnn.R
@@ -33,7 +33,6 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden
# embeding layer
label <- mx.symbol.Variable("label")
data <- mx.symbol.Variable("data")
- data_mask <- mx.symbol.Variable("data.mask")
data_mask_array <- mx.symbol.Variable("data.mask.array")
data_mask_array <- mx.symbol.stop_gradient(data_mask_array, name = "data.mask.array")
@@ -112,8 +111,8 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden
mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidden,
num.embed, num.label, input.size, ctx = NULL, num.round = 1, initializer = mx.init.uniform(0.01),
dropout = 0, config = "one-to-one", optimizer = "sgd", batch.end.callback = NULL,
- epoch.end.callback = NULL, begin.round = 1, end.round = 1, metric = mx.metric.rmse,
- cell.type = "lstm", verbose = FALSE) {
+ epoch.end.callback = NULL, begin.round = 1, metric = mx.metric.rmse, cell.type = "lstm",
+ kvstore = "local", verbose = FALSE) {
if (!train.data$iter.next()) {
train.data$reset()
@@ -131,8 +130,11 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd
if (is.null(ctx))
ctx <- mx.ctx.default()
- if (!is.mx.context(ctx))
- stop("ctx must be mx.context")
+ if (is.mx.context(ctx)) {
+ ctx <- list(ctx)
+ }
+ if (!is.list(ctx))
+ stop("ctx must be mx.context or list of mx.context")
if (is.character(optimizer)) {
if (is.numeric(input.shape)) {
ndim <- length(input.shape)
@@ -155,17 +157,28 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd
symbol <- sym_list[[names(train.data$bucketID)]]
arg.names <- symbol$arguments
- input.shape <- lapply(train.data$value(), dim)
- input.shape <- input.shape[names(input.shape) %in% arg.names]
+ input.names <- c("data", "data.mask.array")
+ input.shape <- sapply(input.names, function(n) {
+ dim(train.data$value()[[n]])
+ }, simplify = FALSE)
+ output.names <- "label"
+ output.shape <- sapply(output.names, function(n) {
+ dim(train.data$value()[[n]])
+ }, simplify = FALSE)
+
+ params <- mx.model.init.params(symbol, input.shape, output.shape, initializer,
+ mx.cpu())
- params <- mx.model.init.params(symbol, input.shape, NULL, initializer, mx.cpu())
+ kvstore <- mxnet:::mx.model.create.kvstore(kvstore, params$arg.params, length(ctx),
+ verbose = verbose)
### Execute training - rnn.model.R
model <- mx.model.train.rnn.buckets(sym_list = sym_list, input.shape = input.shape,
- arg.params = params$arg.params, aux.params = params$aux.params, optimizer = optimizer,
- train.data = train.data, eval.data = eval.data, verbose = verbose, begin.round = begin.round,
- end.round = end.round, metric = metric, ctx = ctx, batch.end.callback = batch.end.callback,
- epoch.end.callback = epoch.end.callback)
+ output.shape = output.shape, arg.params = params$arg.params, aux.params = params$aux.params,
+ optimizer = optimizer, train.data = train.data, eval.data = eval.data, verbose = verbose,
+ begin.round = begin.round, end.round = num.round, metric = metric, ctx = ctx,
+ batch.end.callback = batch.end.callback, epoch.end.callback = epoch.end.callback,
+ kvstore = kvstore)
return(model)
}
@@ -193,10 +206,3 @@ mx.model.check.arguments <- function(symbol) {
}
return(c(data, label))
}
-
-# filter out null, keep the names
-mx.util.filter.null <- function(lst) {
- lst[!sapply(lst, is.null)]
-}
-
-
diff --git a/example/rnn/bucket_R/rnn.train.R b/example/rnn/bucket_R/rnn.train.R
index 962430c..b833b2b 100644
--- a/example/rnn/bucket_R/rnn.train.R
+++ b/example/rnn/bucket_R/rnn.train.R
@@ -4,30 +4,57 @@ source("rnn.R")
# Internal function to do multiple device training on RNN
mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, input.shape,
- begin.round, end.round, optimizer, train.data, eval.data, metric, epoch.end.callback,
- batch.end.callback, verbose = TRUE) {
+ output.shape, begin.round, end.round, optimizer, train.data, eval.data, metric,
+ epoch.end.callback, batch.end.callback, kvstore, verbose = TRUE) {
symbol <- sym_list[[names(train.data$bucketID)]]
input.names <- names(input.shape)
+ output.names <- names(output.shape)
arg.names <- names(arg.params)
+ ndevice <- length(ctx)
+ if (verbose)
+ message(paste0("Start training with ", ndevice, " devices"))
+ input_slice <- mxnet:::mx.model.slice.shape(input.shape, ndevice)
+ output_slice <- mxnet:::mx.model.slice.shape(output.shape, ndevice)
+
+
# Grad request
grad_req <- rep("write", length(symbol$arguments))
+ # grad_null_idx <- match(c(input.names, output.names), symbol$arguments)
grad_null_idx <- match(input.names, symbol$arguments)
grad_req[grad_null_idx] <- "null"
# Arg array order
- update_names <- c(input.names, arg.names)
+ update_names <- c(input.names, output.names, arg.names)
arg_update_idx <- match(symbol$arguments, update_names)
- s <- sapply(input.shape, function(shape) {
- mx.nd.zeros(shape = shape, ctx = mx.cpu())
+ train.execs <- lapply(1:ndevice, function(i) {
+ s <- sapply(append(input_slice[[i]]$shape, output_slice[[i]]$shape), function(shape) {
+ mx.nd.zeros(shape = shape, ctx = mx.cpu())
+ })
+ mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx],
+ aux.arrays = aux.params, ctx = mx.cpu(), grad.req = grad_req)
})
- train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx],
- aux.arrays = aux.params, ctx = ctx, grad.req = grad_req)
+ # KVStore related stuffs
+ params.index <- as.integer(mxnet:::mx.util.filter.null(lapply(1:length(train.execs[[1]]$ref.grad.arrays),
+ function(k) {
+ if (!is.null(train.execs[[1]]$ref.grad.arrays[[k]])) k else NULL
+ })))
+ update.on.kvstore <- FALSE
+ if (!is.null(kvstore) && kvstore$update.on.kvstore) {
+ update.on.kvstore <- TRUE
+ kvstore$set.optimizer(optimizer)
+ } else {
+ updaters <- lapply(1:ndevice, function(i) {
+ mx.opt.get.updater(optimizer, train.execs[[i]]$ref.arg.arrays)
+ })
+ }
- updaters <- mx.opt.get.updater(optimizer, train.exec$ref.arg.arrays)
+ if (!is.null(kvstore)) {
+ kvstore$init(params.index, train.execs[[1]]$ref.arg.arrays[params.index])
+ }
for (iteration in begin.round:end.round) {
nbatch <- 0
@@ -36,26 +63,67 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
}
train.data$reset()
while (train.data$iter.next()) {
- dlist <- train.data$value()[input.names]
+ dlist <- train.data$value() #[input.names]
symbol <- sym_list[[names(train.data$bucketID)]]
+ slices <- lapply(1:ndevice, function(i) {
+ s <- input_slice[[i]]
+ ret <- sapply(names(dlist), function(n) {
+ mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end)
+ })
+ return(ret)
+ })
- train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist,
- train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays,
- ctx = ctx, grad.req = grad_req)
+ train.execs <- lapply(1:ndevice, function(i) {
+ s <- slices[[i]]
+ mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx],
+ aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req)
+ })
- mx.exec.forward(train.exec, is.train = TRUE)
+ for (texec in train.execs) {
+ mx.exec.forward(texec, is.train = TRUE)
+ }
- # copy outputs to CPU
- out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu())
+ out.preds <- lapply(train.execs, function(texec) {
+ mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu())
+ })
- mx.exec.backward(train.exec)
+ for (texec in train.execs) {
+ mx.exec.backward(texec)
+ }
- arg.blocks <- updaters(train.exec$ref.arg.arrays, train.exec$ref.grad.arrays)
- mx.exec.update.arg.arrays(train.exec, arg.blocks, skip.null = TRUE)
+ if (!is.null(kvstore)) {
+ # push the gradient
+ kvstore$push(params.index, lapply(train.execs, function(texec) {
+ texec$ref.grad.arrays[params.index]
+ }), -params.index)
+ }
+ if (update.on.kvstore) {
+ # pull back weight
+ kvstore$pull(params.index, lapply(train.execs, function(texec) {
+ texec$ref.arg.arrays[params.index]
+ }), -params.index)
+ } else {
+ # pull back gradient sums
+ if (!is.null(kvstore)) {
+ kvstore$pull(params.index, lapply(train.execs, function(texec) {
+ texec$ref.grad.arrays[params.index]
+ }), -params.index)
+ }
+ arg.blocks <- lapply(1:ndevice, function(i) {
+ updaters[[i]](train.execs[[i]]$ref.arg.arrays, train.execs[[i]]$ref.grad.arrays)
+ })
+ for (i in 1:ndevice) {
+ mx.exec.update.arg.arrays(train.execs[[i]], arg.blocks[[i]], skip.null = TRUE)
+ }
+ }
# Update the evaluation metrics
if (!is.null(metric)) {
- train.metric <- metric$update(dlist$label, out.preds, train.metric)
+ # train.metric <- metric$update(dlist$label, out.preds, train.metric)
+ for (i in 1:ndevice) {
+ train.metric <- metric$update(slices[[i]][[length(slices[[i]])]],
+ out.preds[[i]], train.metric)
+ }
}
nbatch <- nbatch + 1
@@ -78,19 +146,37 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
eval.data$reset()
while (eval.data$iter.next()) {
# Get input data slice
- dlist <- eval.data$value()[input.names]
+ dlist <- eval.data$value() #[input.names]
symbol <- sym_list[[names(eval.data$bucketID)]]
- train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist,
- train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays,
- ctx = ctx, grad.req = grad_req)
+ slices <- lapply(1:ndevice, function(i) {
+ s <- input_slice[[i]]
+ ret <- sapply(names(dlist), function(n) {
+ mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end)
+ })
+ return(ret)
+ })
- mx.exec.forward(train.exec, is.train = FALSE)
+
+ train.execs <- lapply(1:ndevice, function(i) {
+ s <- slices[[i]]
+ mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx],
+ aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req)
+ })
+
+ for (texec in train.execs) {
+ mx.exec.forward(texec, is.train = FALSE)
+ }
# copy outputs to CPU
- out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu())
+ out.preds <- lapply(train.execs, function(texec) {
+ mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu())
+ })
if (!is.null(metric)) {
- eval.metric <- metric$update(dlist$label, out.preds, eval.metric)
+ for (i in 1:ndevice) {
+ eval.metric <- metric$update(slices[[i]][[length(slices[[i]])]],
+ out.preds[[i]], eval.metric)
+ }
}
}
@@ -105,7 +191,7 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
eval.metric <- NULL
}
# get the model out
- model <- mxnet:::mx.model.extract.model(symbol, list(train.exec))
+ model <- mxnet:::mx.model.extract.model(symbol, train.execs)
epoch_continue <- TRUE
if (!is.null(epoch.end.callback)) {
--
To stop receiving notification emails like this one, please contact
['"commits@mxnet.apache.org" <co...@mxnet.apache.org>'].