You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by ni...@apache.org on 2019/03/24 04:52:30 UTC

[systemml] branch master updated: [SYSTEMML-540] Invoke update immediately after backward call at the script-level.

This is an automated email from the ASF dual-hosted git repository.

niketanpansare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new b49ac76  [SYSTEMML-540] Invoke update immediately after backward call at the script-level.
b49ac76 is described below

commit b49ac760c180baa0582c2168c4b58fb3c0108bc4
Author: Niketan Pansare <np...@us.ibm.com>
AuthorDate: Sat Mar 23 21:51:45 2019 -0700

    [SYSTEMML-540] Invoke update immediately after backward call at the script-level.
    
    - This reduces the chance of unnecessary evictions especially when there are statement block cuts.
    - The configuration property `perform_fused_backward_update` allows the user to toggle this behavior and control the script-generation process.
    - Also, updated the release creation document to ensure that untracked files are not included in the artifacts.
    - For Resnet-200, the eviction time was reduced from 173.488 seconds to 60.048 seconds with minibatch size of 96.
---
 docs/release-creation-process.md                   |  3 +
 src/main/python/systemml/mllearn/estimators.py     |  8 ++-
 .../scala/org/apache/sysml/api/dl/Caffe2DML.scala  | 76 ++++++++++++++++++----
 .../scala/org/apache/sysml/api/dl/CaffeLayer.scala |  2 +
 4 files changed, 76 insertions(+), 13 deletions(-)

diff --git a/docs/release-creation-process.md b/docs/release-creation-process.md
index 3115390..bf05000 100644
--- a/docs/release-creation-process.md
+++ b/docs/release-creation-process.md
@@ -42,6 +42,9 @@ Step 1: Prepare the release.
 
 	# Extract latest code to a directory
 	<GitRepoHome>
+	
+	# Check if there are any untracked files (created by the unit tests) and remove them to avoid packing them in the artifacts
+	git status
 
 	# Go to dev/release directory
 	cd <GitRepoHome>/dev/release
diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py
index 8d1e164..456280b 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -922,7 +922,8 @@ class Caffe2DML(BaseSystemMLClassifier):
             self.estimator.setWeightsToIgnore(ignore_weights)
 
     def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None,
-            output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None):
+            output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None,
+            perform_fused_backward_update=None):
         """
         Set input to Caffe2DML
 
@@ -933,10 +934,11 @@ class Caffe2DML(BaseSystemMLClassifier):
         test_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch)
         parallel_batches: number of parallel batches
         output_activations: (developer flag) directory to output activations of each layer as csv while prediction. To be used only in batch mode (default: None)
-        perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: False)
+        perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: True)
         parfor_parameters: dictionary for parfor parameters when using allreduce-style algorithms (default: "")
         inline_nn_library: whether to inline the NN library when generating DML using Caffe2DML (default: False)
         use_builtin_lstm_fn: whether to use builtin lstm function for LSTM layer (default: True)
+        perform_fused_backward_update: whether to perform update immediately after backward pass at the script level. Supported for minibatch and batch algorithms. (default: True)
         """
         if debug is not None:
             self.estimator.setInput("$debug", str(debug).upper())
@@ -950,6 +952,8 @@ class Caffe2DML(BaseSystemMLClassifier):
             self.estimator.setInput("$parallel_batches", str(parallel_batches))
         if use_builtin_lstm_fn is not None:
             self.estimator.setInput("$use_builtin_lstm_fn", str(use_builtin_lstm_fn).upper())
+        if perform_fused_backward_update is not None:
+            self.estimator.setInput("$perform_fused_backward_update", str(perform_fused_backward_update).upper())
         if output_activations is not None:
             self.estimator.setInput(
                 "$output_activations",
diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
index e480dfc..c5a20db 100644
--- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala
@@ -304,14 +304,27 @@ class Caffe2DML(val sc: SparkContext,
     net.getLayers.map(layer => {net.getCaffeLayer(layer).debugLayer = isDebug})
     net.getLayers.map(layer => {net.getCaffeLayer(layer).caffe2dmlObj = this})
     net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[LSTM]).map(layer => {
-      if (inputs.containsKey("$use_builtin_lstm_fn")) 
-        net.getCaffeLayer(layer).asInstanceOf[LSTM].useBuiltinFunction(inputs.get("$use_builtin_lstm_fn").toLowerCase.toBoolean)
+      net.getCaffeLayer(layer).asInstanceOf[LSTM].useBuiltinFunction(getInputBooleanValue("$use_builtin_lstm_fn")) 
      })
   }
   
   // Comma is included
   def getParforParameters():String = if (inputs.containsKey("$parfor_parameters")) inputs.get("$parfor_parameters") else ""
 
+  def getInputBooleanValue(key:String):Boolean = {
+    if(inputs.containsKey(key))
+      return inputs.get(key).toLowerCase.toBoolean
+    else {
+      key match {
+        case "$debug" => false
+        case "$perform_one_hot_encoding" => true
+        case "$inline_nn_library" => false
+        case "$use_builtin_lstm_fn" => true
+        case "$perform_fused_backward_update" => true
+        case _ => throw new DMLRuntimeException("Unsupported input:" + key)
+      }
+    } 
+  }
   // ================================================================================================
   // The below method parses the provided network and solver file and generates DML script.
   def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = {
@@ -320,16 +333,18 @@ class Caffe2DML(val sc: SparkContext,
     reset // Reset the state of DML generator for training script.
 
     // Flags passed by user
-    val DEBUG_TRAINING = if (inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false
-    Caffe2DML.INLINE_NN_LIBRARY = if (inputs.containsKey("$inline_nn_library")) inputs.get("$inline_nn_library").toLowerCase.toBoolean else false
+    val DEBUG_TRAINING = getInputBooleanValue("$debug")
+    Caffe2DML.INLINE_NN_LIBRARY = getInputBooleanValue("$inline_nn_library")
     assign(tabDMLScript, "debug", if (DEBUG_TRAINING) "TRUE" else "FALSE")
     setDebugFlags(DEBUG_TRAINING)
 
     appendHeaders(net, solver, true) // Appends DML corresponding to source and externalFunction statements.
-    val performOneHotEncoding = !inputs.containsKey("$perform_one_hot_encoding") || inputs.get("$perform_one_hot_encoding").toBoolean
+    val performOneHotEncoding = getInputBooleanValue("$perform_one_hot_encoding")
     readInputData(net, true, performOneHotEncoding)         // Read X_full and y_full
     // Initialize the layers and solvers. Reads weights and bias if $weights is set.
     initWeights(net, solver, inputs.containsKey("$weights"), layersToIgnore)
+    
+    val performFusedBackwardUpdate = getInputBooleanValue("$perform_fused_backward_update")
 
     // Split into training and validation set
     // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages
@@ -355,7 +370,13 @@ class Caffe2DML(val sc: SparkContext,
           getTrainingBatch(tabDMLScript)
           // -------------------------------------------------------
           // Perform forward, backward and update on minibatch
-          forward; backward; update
+          forward;
+          if(performFusedBackwardUpdate) {
+            backwardUpdate
+          }
+          else {
+            backward; update
+          }
           // -------------------------------------------------------
           if(solverParam.getDisplay > 0) {
             ifBlock("iter  %% " + solverParam.getDisplay + " == 0") {
@@ -385,7 +406,13 @@ class Caffe2DML(val sc: SparkContext,
           assign(tabDMLScript, "yb", Caffe2DML.y)
           // -------------------------------------------------------
           // Perform forward, backward and update on entire dataset
-          forward; backward; update
+          forward
+          if(performFusedBackwardUpdate) {
+            backwardUpdate
+          }
+          else {
+            backward; update
+          }
           // -------------------------------------------------------
           if(solverParam.getDisplay > 0) {
             // Show training/validation loss every epoch
@@ -416,6 +443,10 @@ class Caffe2DML(val sc: SparkContext,
             rightIndexing(tabDMLScript, "Xb", Caffe2DML.X, "beg", "end")
             rightIndexing(tabDMLScript, "yb", Caffe2DML.y, "beg", "end")
             forward; backward
+            if(performFusedBackwardUpdate && inputs.containsKey("$perform_fused_backward_update")) {
+              // Warn user only if the user explicitly ask for it
+              Caffe2DML.LOG.warn("Fused backward update is not supported for allreduce_parallel_batches")
+            }
             flattenGradients
             if(solverParam.getDisplay > 0) {
               ifBlock("(iter + j - 1)  %% " + solverParam.getDisplay + " == 0") {
@@ -447,6 +478,10 @@ class Caffe2DML(val sc: SparkContext,
             assign(tabDMLScript, "Xb", Caffe2DML.X + "[j,]")
             assign(tabDMLScript, "yb", Caffe2DML.y + "[j,]")
             forward; backward
+            if(performFusedBackwardUpdate && inputs.containsKey("$perform_fused_backward_update")) {
+              // Warn user only if the user explicitly ask for it
+              Caffe2DML.LOG.warn("Fused backward update is not supported for allreduce_parallel_batches")
+            }
             flattenGradients
           }
           aggregateAggGradients
@@ -513,7 +548,7 @@ class Caffe2DML(val sc: SparkContext,
   }
   
   private def displayTrainingLoss(lossLayer: IsLossLayer, performOneHotEncoding:Boolean): Unit = {
-    val DEBUG_TRAINING = if (inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false
+    val DEBUG_TRAINING = getInputBooleanValue("$debug")
     tabDMLScript.append("# Compute training loss & accuracy\n")
     assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0")
     lossLayer.computeLoss(dmlScript, numTabs)
@@ -619,6 +654,25 @@ class Caffe2DML(val sc: SparkContext,
     tabDMLScript.append("# Update the parameters\n")
     net.getLayers.map(layer => solver.update(tabDMLScript, net.getCaffeLayer(layer)))
   }
+  private def backwardUpdate(): Unit = {
+    tabDMLScript.append("# Perform backward pass and update the parameters\n")
+    var skipedLayer:String = null
+    for(layer <- net.getLayers.reverse) {
+      val caffeLayer = net.getCaffeLayer(layer)
+      caffeLayer.backward(tabDMLScript, "")
+      if(caffeLayer.isInstanceOf[Scale] && caffeLayer.asInstanceOf[Scale].isPartOfBatchNorm) {
+        skipedLayer = layer // Skip update
+      }
+      else {
+        solver.update(tabDMLScript, caffeLayer) // Perform update of the current layer
+        if(skipedLayer != null) {
+          // And then update the skipped layer (if any)
+          solver.update(tabDMLScript, net.getCaffeLayer(skipedLayer))
+          skipedLayer = null
+        }
+      }
+    }
+  }
   private def initializeGradients(parallel_batches: String): Unit = {
     tabDMLScript.append("# Data structure to store gradients computed in parallel\n")
     if(Caffe2DML.USE_PLUS_EQ) {
@@ -730,13 +784,13 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C
 
     reset // Reset the state of DML generator for training script.
 
-    val DEBUG_PREDICTION = if (estimator.inputs.containsKey("$debug")) estimator.inputs.get("$debug").toLowerCase.toBoolean else false
-    Caffe2DML.INLINE_NN_LIBRARY = if (estimator.inputs.containsKey("$inline_nn_library")) estimator.inputs.get("$inline_nn_library").toLowerCase.toBoolean else false
+    val DEBUG_PREDICTION =  estimator.getInputBooleanValue("$debug")
+    Caffe2DML.INLINE_NN_LIBRARY = estimator.getInputBooleanValue("$inline_nn_library")
     assign(tabDMLScript, "debug", if (DEBUG_PREDICTION) "TRUE" else "FALSE")
     estimator.setDebugFlags(DEBUG_PREDICTION)
 
     appendHeaders(net, solver, false) // Appends DML corresponding to source and externalFunction statements.
-    val performOneHotEncoding = !estimator.inputs.containsKey("$perform_one_hot_encoding") || estimator.inputs.get("$perform_one_hot_encoding").toBoolean
+    val performOneHotEncoding = estimator.getInputBooleanValue("$perform_one_hot_encoding")
     readInputData(net, false, performOneHotEncoding)         // Read X_full and y_full
     assign(tabDMLScript, "X", "X_full")
 
diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
index 62323d1..bf86f38 100644
--- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
+++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala
@@ -370,6 +370,7 @@ class BatchNorm(val param: LayerParameter, val id: Int, val net: CaffeNetwork) e
       val topLayers = net.getTopLayers(param.getName).map(l => net.getCaffeLayer(l)).toList
       if (topLayers.length != 1 && !topLayers(0).isInstanceOf[Scale]) throw new LanguageException("Only one top layer of type Scale allowed for BatchNorm")
       scaleLayer = topLayers(0).asInstanceOf[Scale]
+      scaleLayer.isPartOfBatchNorm = true
     }
   def numChannels = bottomLayerOutputShape._1
   def Hin         = bottomLayerOutputShape._2
@@ -385,6 +386,7 @@ class Scale(val param: LayerParameter, val id: Int, val net: CaffeNetwork) exten
   override def backward(dmlScript: StringBuilder, outSuffix: String): Unit = assignDoutToDX(dmlScript, outSuffix)
   override def weightShape(): Array[Int]                                   = Array(bottomLayerOutputShape._1.toInt, 1)
   override def biasShape(): Array[Int]                                     = Array(bottomLayerOutputShape._1.toInt, 1)
+  var isPartOfBatchNorm = false
 }
 // ------------------------------------------------------------------