You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mxnet.apache.org by qk...@apache.org on 2017/08/02 21:51:36 UTC

[incubator-mxnet] branch master updated: [R] RNN bucketing with multiple devices. (#7315)

This is an automated email from the ASF dual-hosted git repository.

qkou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-mxnet.git


The following commit(s) were added to refs/heads/master by this push:
     new 5976973  [R] RNN bucketing with multiple devices. (#7315)
5976973 is described below

commit 59769736a402b834b5d53bcdaae895ac4a069e12
Author: Qiang Kou (KK) <qk...@qkou.info>
AuthorDate: Wed Aug 2 21:51:34 2017 +0000

    [R] RNN bucketing with multiple devices. (#7315)
---
 example/rnn/bucket_R/aclImdb_lstm_classification.R |  55 +++-----
 example/rnn/bucket_R/mx.io.bucket.iter.R           |   6 +-
 example/rnn/bucket_R/rnn.R                         |  44 ++++---
 example/rnn/bucket_R/rnn.train.R                   | 140 +++++++++++++++++----
 4 files changed, 157 insertions(+), 88 deletions(-)

diff --git a/example/rnn/bucket_R/aclImdb_lstm_classification.R b/example/rnn/bucket_R/aclImdb_lstm_classification.R
index aaa6d38..bb5eaac 100644
--- a/example/rnn/bucket_R/aclImdb_lstm_classification.R
+++ b/example/rnn/bucket_R/aclImdb_lstm_classification.R
@@ -11,51 +11,30 @@ vocab <- length(corpus_bucketed_test$dic)
 ### Create iterators
 batch.size <- 64
 
-train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets,
-                                batch.size = batch.size,
-                                data.mask.element = 0,
-                                shuffle = TRUE)
+num.round <- 16
 
-eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets,
-                               batch.size = batch.size,
-                               data.mask.element = 0,
-                               shuffle = FALSE)
+train.data <- mx.io.bucket.iter(buckets = corpus_bucketed_train$buckets, batch.size = batch.size, 
+  data.mask.element = 0, shuffle = TRUE)
+
+eval.data <- mx.io.bucket.iter(buckets = corpus_bucketed_test$buckets, batch.size = batch.size, 
+  data.mask.element = 0, shuffle = FALSE)
 
 mx.set.seed(0)
+optimizer <- mx.opt.create("adadelta", rho = 0.92, epsilon = 1e-06, wd = 2e-04, clip_gradient = NULL, 
+  rescale.grad = 1/batch.size)
+
+model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data, begin.round = 1, 
+  num.round = num.round, ctx = mx.cpu(), metric = mx.metric.accuracy, optimizer = optimizer, 
+  num.rnn.layer = 2, num.embed = 16, num.hidden = 24, num.label = 2, input.size = vocab, 
+  initializer = mx.init.Xavier(rnd_type = "gaussian", factor_type = "in", magnitude = 2), 
+  dropout = 0.25, config = "seq-to-one", batch.end.callback = mx.callback.log.train.metric(period = 50), 
+  verbose = TRUE)
 
-end.round <- 16
-
-optimizer <- mx.opt.create("adadelta",
-                           rho = 0.92,
-                           epsilon = 1e-06,
-                           wd = 2e-04,
-                           clip_gradient = NULL,
-                           rescale.grad = 1/batch.size)
-
-model_sentiment_lstm <- mx.rnn.buckets(train.data = train.data,
-                                       begin.round = 1,
-                                       end.round = end.round,
-                                       ctx = mx.cpu(),
-                                       metric = mx.metric.accuracy,
-                                       optimizer = optimizer,
-                                       num.rnn.layer = 2,
-                                       num.embed = 16,
-                                       num.hidden = 24,
-                                       num.label = 2,
-                                       input.size = vocab,
-                                       initializer = mx.init.Xavier(rnd_type = "gaussian",
-                                                                    factor_type = "in",
-                                                                    magnitude = 2),
-                                       dropout = 0.25,
-                                       config = "seq-to-one",
-                                       batch.end.callback = mx.callback.log.train.metric(period = 50),
-                                       verbose = TRUE)
-
-mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = end.round)
+mx.model.save(model_sentiment_lstm, prefix = "model_sentiment_lstm", iteration = num.round)
 
 source("rnn.infer.R")
 
-model <- mx.model.load("model_sentiment_lstm", iteration = end.round)
+model <- mx.model.load("model_sentiment_lstm", iteration = num.round)
 
 pred <- mx.rnn.infer.buckets(infer_iter = eval.data, model, "seq-to-one", ctx = mx.cpu())
 
diff --git a/example/rnn/bucket_R/mx.io.bucket.iter.R b/example/rnn/bucket_R/mx.io.bucket.iter.R
index 887247a..61f8795 100644
--- a/example/rnn/bucket_R/mx.io.bucket.iter.R
+++ b/example/rnn/bucket_R/mx.io.bucket.iter.R
@@ -64,16 +64,14 @@ BucketIter <- setRefClass("BucketIter", fields = c("buckets", "bucket.names", "b
   # to appropriate sequence length)
   idx <- (.self$bucketID - 1) * (.self$batch.size) + (1:batch.size)
   data <- .self$buckets[[names(.self$bucketID)]]$data[, idx, drop = F]
-  data_mask <- as.integer(names(.self$bucketID)) - apply(data == .self$data.mask.element, 
-    2, sum)
   data_mask_array <- (!data == 0)
   if (length(dim(.self$buckets[[names(.self$bucketID)]]$label)) == 0) {
     label <- .self$buckets[[names(.self$bucketID)]]$label[idx]
   } else {
     label <- .self$buckets[[names(.self$bucketID)]]$label[, idx, drop = F]
   }
-  return(list(data = mx.nd.array(data), label = mx.nd.array(label), data.mask = mx.nd.array(data_mask), 
-    data.mask.array = mx.nd.array(data_mask_array)))
+  return(list(data = mx.nd.array(data), data.mask.array = mx.nd.array(data_mask_array), 
+    label = mx.nd.array(label)))
 }, finalize = function() {
 }))
 
diff --git a/example/rnn/bucket_R/rnn.R b/example/rnn/bucket_R/rnn.R
index f55272f..ea02b95 100644
--- a/example/rnn/bucket_R/rnn.R
+++ b/example/rnn/bucket_R/rnn.R
@@ -33,7 +33,6 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden
   # embeding layer
   label <- mx.symbol.Variable("label")
   data <- mx.symbol.Variable("data")
-  data_mask <- mx.symbol.Variable("data.mask")
   data_mask_array <- mx.symbol.Variable("data.mask.array")
   data_mask_array <- mx.symbol.stop_gradient(data_mask_array, name = "data.mask.array")
   
@@ -112,8 +111,8 @@ rnn.unroll <- function(num.rnn.layer, seq.len, input.size, num.embed, num.hidden
 mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidden, 
   num.embed, num.label, input.size, ctx = NULL, num.round = 1, initializer = mx.init.uniform(0.01), 
   dropout = 0, config = "one-to-one", optimizer = "sgd", batch.end.callback = NULL, 
-  epoch.end.callback = NULL, begin.round = 1, end.round = 1, metric = mx.metric.rmse, 
-  cell.type = "lstm", verbose = FALSE) {
+  epoch.end.callback = NULL, begin.round = 1, metric = mx.metric.rmse, cell.type = "lstm", 
+  kvstore = "local", verbose = FALSE) {
   
   if (!train.data$iter.next()) {
     train.data$reset()
@@ -131,8 +130,11 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd
   
   if (is.null(ctx)) 
     ctx <- mx.ctx.default()
-  if (!is.mx.context(ctx)) 
-    stop("ctx must be mx.context")
+  if (is.mx.context(ctx)) {
+    ctx <- list(ctx)
+  }
+  if (!is.list(ctx)) 
+    stop("ctx must be mx.context or list of mx.context")
   if (is.character(optimizer)) {
     if (is.numeric(input.shape)) {
       ndim <- length(input.shape)
@@ -155,17 +157,28 @@ mx.rnn.buckets <- function(train.data, eval.data = NULL, num.rnn.layer, num.hidd
   symbol <- sym_list[[names(train.data$bucketID)]]
   
   arg.names <- symbol$arguments
-  input.shape <- lapply(train.data$value(), dim)
-  input.shape <- input.shape[names(input.shape) %in% arg.names]
+  input.names <- c("data", "data.mask.array")
+  input.shape <- sapply(input.names, function(n) {
+    dim(train.data$value()[[n]])
+  }, simplify = FALSE)
+  output.names <- "label"
+  output.shape <- sapply(output.names, function(n) {
+    dim(train.data$value()[[n]])
+  }, simplify = FALSE)
+  
+  params <- mx.model.init.params(symbol, input.shape, output.shape, initializer, 
+    mx.cpu())
   
-  params <- mx.model.init.params(symbol, input.shape, NULL, initializer, mx.cpu())
+  kvstore <- mxnet:::mx.model.create.kvstore(kvstore, params$arg.params, length(ctx), 
+    verbose = verbose)
   
   ### Execute training - rnn.model.R
   model <- mx.model.train.rnn.buckets(sym_list = sym_list, input.shape = input.shape, 
-    arg.params = params$arg.params, aux.params = params$aux.params, optimizer = optimizer, 
-    train.data = train.data, eval.data = eval.data, verbose = verbose, begin.round = begin.round, 
-    end.round = end.round, metric = metric, ctx = ctx, batch.end.callback = batch.end.callback, 
-    epoch.end.callback = epoch.end.callback)
+    output.shape = output.shape, arg.params = params$arg.params, aux.params = params$aux.params, 
+    optimizer = optimizer, train.data = train.data, eval.data = eval.data, verbose = verbose, 
+    begin.round = begin.round, end.round = num.round, metric = metric, ctx = ctx, 
+    batch.end.callback = batch.end.callback, epoch.end.callback = epoch.end.callback, 
+    kvstore = kvstore)
   
   return(model)
 }
@@ -193,10 +206,3 @@ mx.model.check.arguments <- function(symbol) {
   }
   return(c(data, label))
 }
-
-# filter out null, keep the names
-mx.util.filter.null <- function(lst) {
-  lst[!sapply(lst, is.null)]
-}
-
-
diff --git a/example/rnn/bucket_R/rnn.train.R b/example/rnn/bucket_R/rnn.train.R
index 962430c..b833b2b 100644
--- a/example/rnn/bucket_R/rnn.train.R
+++ b/example/rnn/bucket_R/rnn.train.R
@@ -4,30 +4,57 @@ source("rnn.R")
 
 # Internal function to do multiple device training on RNN
 mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, input.shape, 
-  begin.round, end.round, optimizer, train.data, eval.data, metric, epoch.end.callback, 
-  batch.end.callback, verbose = TRUE) {
+  output.shape, begin.round, end.round, optimizer, train.data, eval.data, metric, 
+  epoch.end.callback, batch.end.callback, kvstore, verbose = TRUE) {
   symbol <- sym_list[[names(train.data$bucketID)]]
   
   input.names <- names(input.shape)
+  output.names <- names(output.shape)
   arg.names <- names(arg.params)
   
+  ndevice <- length(ctx)
+  if (verbose) 
+    message(paste0("Start training with ", ndevice, " devices"))
+  input_slice <- mxnet:::mx.model.slice.shape(input.shape, ndevice)
+  output_slice <- mxnet:::mx.model.slice.shape(output.shape, ndevice)
+  
+  
   # Grad request
   grad_req <- rep("write", length(symbol$arguments))
+  # grad_null_idx <- match(c(input.names, output.names), symbol$arguments)
   grad_null_idx <- match(input.names, symbol$arguments)
   grad_req[grad_null_idx] <- "null"
   
   # Arg array order
-  update_names <- c(input.names, arg.names)
+  update_names <- c(input.names, output.names, arg.names)
   arg_update_idx <- match(symbol$arguments, update_names)
   
-  s <- sapply(input.shape, function(shape) {
-    mx.nd.zeros(shape = shape, ctx = mx.cpu())
+  train.execs <- lapply(1:ndevice, function(i) {
+    s <- sapply(append(input_slice[[i]]$shape, output_slice[[i]]$shape), function(shape) {
+      mx.nd.zeros(shape = shape, ctx = mx.cpu())
+    })
+    mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx], 
+      aux.arrays = aux.params, ctx = mx.cpu(), grad.req = grad_req)
   })
   
-  train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, arg.params)[arg_update_idx], 
-    aux.arrays = aux.params, ctx = ctx, grad.req = grad_req)
+  # KVStore related stuffs
+  params.index <- as.integer(mxnet:::mx.util.filter.null(lapply(1:length(train.execs[[1]]$ref.grad.arrays), 
+    function(k) {
+      if (!is.null(train.execs[[1]]$ref.grad.arrays[[k]])) k else NULL
+    })))
+  update.on.kvstore <- FALSE
+  if (!is.null(kvstore) && kvstore$update.on.kvstore) {
+    update.on.kvstore <- TRUE
+    kvstore$set.optimizer(optimizer)
+  } else {
+    updaters <- lapply(1:ndevice, function(i) {
+      mx.opt.get.updater(optimizer, train.execs[[i]]$ref.arg.arrays)
+    })
+  }
   
-  updaters <- mx.opt.get.updater(optimizer, train.exec$ref.arg.arrays)
+  if (!is.null(kvstore)) {
+    kvstore$init(params.index, train.execs[[1]]$ref.arg.arrays[params.index])
+  }
   
   for (iteration in begin.round:end.round) {
     nbatch <- 0
@@ -36,26 +63,67 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
     }
     train.data$reset()
     while (train.data$iter.next()) {
-      dlist <- train.data$value()[input.names]
+      dlist <- train.data$value()  #[input.names]
       symbol <- sym_list[[names(train.data$bucketID)]]
+      slices <- lapply(1:ndevice, function(i) {
+        s <- input_slice[[i]]
+        ret <- sapply(names(dlist), function(n) {
+          mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end)
+        })
+        return(ret)
+      })
       
-      train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist, 
-        train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays, 
-        ctx = ctx, grad.req = grad_req)
+      train.execs <- lapply(1:ndevice, function(i) {
+        s <- slices[[i]]
+        mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx], 
+          aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req)
+      })
       
-      mx.exec.forward(train.exec, is.train = TRUE)
+      for (texec in train.execs) {
+        mx.exec.forward(texec, is.train = TRUE)
+      }
       
-      # copy outputs to CPU
-      out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu())
+      out.preds <- lapply(train.execs, function(texec) {
+        mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu())
+      })
       
-      mx.exec.backward(train.exec)
+      for (texec in train.execs) {
+        mx.exec.backward(texec)
+      }
       
-      arg.blocks <- updaters(train.exec$ref.arg.arrays, train.exec$ref.grad.arrays)
-      mx.exec.update.arg.arrays(train.exec, arg.blocks, skip.null = TRUE)
+      if (!is.null(kvstore)) {
+        # push the gradient
+        kvstore$push(params.index, lapply(train.execs, function(texec) {
+          texec$ref.grad.arrays[params.index]
+        }), -params.index)
+      }
+      if (update.on.kvstore) {
+        # pull back weight
+        kvstore$pull(params.index, lapply(train.execs, function(texec) {
+          texec$ref.arg.arrays[params.index]
+        }), -params.index)
+      } else {
+        # pull back gradient sums
+        if (!is.null(kvstore)) {
+          kvstore$pull(params.index, lapply(train.execs, function(texec) {
+          texec$ref.grad.arrays[params.index]
+          }), -params.index)
+        }
+        arg.blocks <- lapply(1:ndevice, function(i) {
+          updaters[[i]](train.execs[[i]]$ref.arg.arrays, train.execs[[i]]$ref.grad.arrays)
+        })
+        for (i in 1:ndevice) {
+          mx.exec.update.arg.arrays(train.execs[[i]], arg.blocks[[i]], skip.null = TRUE)
+        }
+      }
       
       # Update the evaluation metrics
       if (!is.null(metric)) {
-        train.metric <- metric$update(dlist$label, out.preds, train.metric)
+        # train.metric <- metric$update(dlist$label, out.preds, train.metric)
+        for (i in 1:ndevice) {
+          train.metric <- metric$update(slices[[i]][[length(slices[[i]])]], 
+          out.preds[[i]], train.metric)
+        }
       }
       
       nbatch <- nbatch + 1
@@ -78,19 +146,37 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
       eval.data$reset()
       while (eval.data$iter.next()) {
         # Get input data slice
-        dlist <- eval.data$value()[input.names]
+        dlist <- eval.data$value()  #[input.names]
         symbol <- sym_list[[names(eval.data$bucketID)]]
-        train.exec <- mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(dlist, 
-          train.exec$arg.arrays[arg.names])[arg_update_idx], aux.arrays = train.exec$aux.arrays, 
-          ctx = ctx, grad.req = grad_req)
+        slices <- lapply(1:ndevice, function(i) {
+          s <- input_slice[[i]]
+          ret <- sapply(names(dlist), function(n) {
+          mxnet:::mx.nd.slice(dlist[[n]], s$begin, s$end)
+          })
+          return(ret)
+        })
         
-        mx.exec.forward(train.exec, is.train = FALSE)
+        
+        train.execs <- lapply(1:ndevice, function(i) {
+          s <- slices[[i]]
+          mxnet:::mx.symbol.bind(symbol = symbol, arg.arrays = c(s, train.execs[[i]]$arg.arrays[arg.names])[arg_update_idx], 
+          aux.arrays = train.execs[[i]]$aux.arrays, ctx = ctx[[i]], grad.req = grad_req)
+        })
+        
+        for (texec in train.execs) {
+          mx.exec.forward(texec, is.train = FALSE)
+        }
         
         # copy outputs to CPU
-        out.preds <- mx.nd.copyto(train.exec$ref.outputs[[1]], mx.cpu())
+        out.preds <- lapply(train.execs, function(texec) {
+          mx.nd.copyto(texec$ref.outputs[[1]], mx.cpu())
+        })
         
         if (!is.null(metric)) {
-          eval.metric <- metric$update(dlist$label, out.preds, eval.metric)
+          for (i in 1:ndevice) {
+          eval.metric <- metric$update(slices[[i]][[length(slices[[i]])]], 
+            out.preds[[i]], eval.metric)
+          }
         }
       }
       
@@ -105,7 +191,7 @@ mx.model.train.rnn.buckets <- function(ctx, sym_list, arg.params, aux.params, in
       eval.metric <- NULL
     }
     # get the model out
-    model <- mxnet:::mx.model.extract.model(symbol, list(train.exec))
+    model <- mxnet:::mx.model.extract.model(symbol, train.execs)
     
     epoch_continue <- TRUE
     if (!is.null(epoch.end.callback)) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@mxnet.apache.org" <co...@mxnet.apache.org>'].