You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@systemml.apache.org by de...@apache.org on 2016/09/08 19:26:30 UTC

incubator-systemml git commit: [SYSTEMML-897] Add old MLContext Spark Shell examples to docs

Repository: incubator-systemml
Updated Branches:
  refs/heads/master 81ebfe004 -> aa851f681


[SYSTEMML-897] Add old MLContext Spark Shell examples to docs

Closes #232.


Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/aa851f68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/aa851f68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/aa851f68

Branch: refs/heads/master
Commit: aa851f68156c7ad33caa9eb3d0270e80d909fc0f
Parents: 81ebfe0
Author: Deron Eriksson <de...@us.ibm.com>
Authored: Thu Sep 8 12:22:36 2016 -0700
Committer: Deron Eriksson <de...@us.ibm.com>
Committed: Thu Sep 8 12:22:36 2016 -0700

----------------------------------------------------------------------
 docs/spark-mlcontext-programming-guide.md | 430 ++++++++++++++++++++++++-
 1 file changed, 428 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/aa851f68/docs/spark-mlcontext-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/spark-mlcontext-programming-guide.md b/docs/spark-mlcontext-programming-guide.md
index d0e1f0f..95a5269 100644
--- a/docs/spark-mlcontext-programming-guide.md
+++ b/docs/spark-mlcontext-programming-guide.md
@@ -34,12 +34,16 @@ The Spark `MLContext` API offers a programmatic interface for interacting with S
 such as Scala, Java, and Python. As a result, it offers a convenient way to interact with SystemML from the Spark
 Shell and from Notebooks such as Jupyter and Zeppelin.
 
-**NOTE: The MLContext API has been redesigned. Currently both the old API and the new API can be used. The old API
-will be deprecated and removed, so please migrate to the new API.**
+**NOTE: A new MLContext API has been redesigned for future SystemML releases. The old API is available
+in all versions of SystemML but will be deprecated and removed, so please migrate to the new API.**
 
 
 # Spark Shell Example - NEW API
 
+**NOTE: The new MLContext API will be available in future SystemML releases. It can be used
+by building the project using Maven ('mvn clean package', or 'mvn clean package -P distribution').
+For SystemML version 0.10.0 and earlier, please see the documentation regarding the old API.**
+
 ## Start Spark Shell with SystemML
 
 To use SystemML with Spark Shell, the SystemML jar can be referenced using Spark Shell's `--jars` option.
@@ -1637,6 +1641,428 @@ scala> for (i <- 1 to 5) {
 </div>
 
 
+# Spark Shell Example - OLD API
+
+## Start Spark Shell with SystemML
+
+To use SystemML with the Spark Shell, the SystemML jar can be referenced using the Spark Shell's `--jars` option.
+Instructions to build the SystemML jar can be found in the [SystemML GitHub README](https://github.com/apache/incubator-systemml).
+
+{% highlight bash %}
+./bin/spark-shell --executor-memory 4G --driver-memory 4G --jars SystemML.jar
+{% endhighlight %}
+
+Here is an example of Spark Shell with SystemML and YARN.
+
+{% highlight bash %}
+./bin/spark-shell --master yarn-client --num-executors 3 --driver-memory 5G --executor-memory 5G --executor-cores 4 --jars SystemML.jar
+{% endhighlight %}
+
+
+## Create MLContext
+
+An `MLContext` object can be created by passing its constructor a reference to the `SparkContext`.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala>import org.apache.sysml.api.MLContext
+import org.apache.sysml.api.MLContext
+
+scala> val ml = new MLContext(sc)
+ml: org.apache.sysml.api.MLContext = org.apache.sysml.api.MLContext@33e38c6b
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+import org.apache.sysml.api.MLContext
+val ml = new MLContext(sc)
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## Create DataFrame
+
+For demonstration purposes, we'll create a `DataFrame` consisting of 100,000 rows and 1,000 columns
+of random `double`s.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> import org.apache.spark.sql._
+import org.apache.spark.sql._
+
+scala> import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
+import org.apache.spark.sql.types.{StructType, StructField, DoubleType}
+
+scala> import scala.util.Random
+import scala.util.Random
+
+scala> val numRows = 100000
+numRows: Int = 100000
+
+scala> val numCols = 1000
+numCols: Int = 1000
+
+scala> val data = sc.parallelize(0 to numRows-1).map { _ => Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) }
+data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[1] at map at <console>:33
+
+scala> val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, DoubleType, true) } )
+schema: org.apache.spark.sql.types.StructType = StructType(StructField(C0,DoubleType,true), StructField(C1,DoubleType,true), StructField(C2,DoubleType,true), StructField(C3,DoubleType,true), StructField(C4,DoubleType,true), StructField(C5,DoubleType,true), StructField(C6,DoubleType,true), StructField(C7,DoubleType,true), StructField(C8,DoubleType,true), StructField(C9,DoubleType,true), StructField(C10,DoubleType,true), StructField(C11,DoubleType,true), StructField(C12,DoubleType,true), StructField(C13,DoubleType,true), StructField(C14,DoubleType,true), StructField(C15,DoubleType,true), StructField(C16,DoubleType,true), StructField(C17,DoubleType,true), StructField(C18,DoubleType,true), StructField(C19,DoubleType,true), StructField(C20,DoubleType,true), StructField(C21,DoubleType,true), ...
+
+scala> val df = sqlContext.createDataFrame(data, schema)
+df: org.apache.spark.sql.DataFrame = [C0: double, C1: double, C2: double, C3: double, C4: double, C5: double, C6: double, C7: double, C8: double, C9: double, C10: double, C11: double, C12: double, C13: double, C14: double, C15: double, C16: double, C17: double, C18: double, C19: double, C20: double, C21: double, C22: double, C23: double, C24: double, C25: double, C26: double, C27: double, C28: double, C29: double, C30: double, C31: double, C32: double, C33: double, C34: double, C35: double, C36: double, C37: double, C38: double, C39: double, C40: double, C41: double, C42: double, C43: double, C44: double, C45: double, C46: double, C47: double, C48: double, C49: double, C50: double, C51: double, C52: double, C53: double, C54: double, C55: double, C56: double, C57: double, C58: double, C5...
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+import org.apache.spark.sql._
+import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
+import scala.util.Random
+val numRows = 100000
+val numCols = 1000
+val data = sc.parallelize(0 to numRows-1).map { _ => Row.fromSeq(Seq.fill(numCols)(Random.nextDouble)) }
+val schema = StructType((0 to numCols-1).map { i => StructField("C" + i, DoubleType, true) } )
+val df = sqlContext.createDataFrame(data, schema)
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## Helper Methods
+
+For convenience, we'll create some helper methods. The SystemML output data is encapsulated in
+an `MLOutput` object. The `getScalar()` method extracts a scalar value from a `DataFrame` returned by
+`MLOutput`. The `getScalarDouble()` method returns such a value as a `Double`, and the
+`getScalarInt()` method returns such a value as an `Int`.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> import org.apache.sysml.api.MLOutput
+import org.apache.sysml.api.MLOutput
+
+scala> def getScalar(outputs: MLOutput, symbol: String): Any =
+     | outputs.getDF(sqlContext, symbol).first()(1)
+getScalar: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Any
+
+scala> def getScalarDouble(outputs: MLOutput, symbol: String): Double =
+     | getScalar(outputs, symbol).asInstanceOf[Double]
+getScalarDouble: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Double
+
+scala> def getScalarInt(outputs: MLOutput, symbol: String): Int =
+     | getScalarDouble(outputs, symbol).toInt
+getScalarInt: (outputs: org.apache.sysml.api.MLOutput, symbol: String)Int
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+import org.apache.sysml.api.MLOutput
+def getScalar(outputs: MLOutput, symbol: String): Any =
+outputs.getDF(sqlContext, symbol).first()(1)
+def getScalarDouble(outputs: MLOutput, symbol: String): Double =
+getScalar(outputs, symbol).asInstanceOf[Double]
+def getScalarInt(outputs: MLOutput, symbol: String): Int =
+getScalarDouble(outputs, symbol).toInt
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## Convert DataFrame to Binary-Block Matrix
+
+SystemML is optimized to operate on a binary-block format for matrix representation. For large
+datasets, conversion from DataFrame to binary-block can require a significant quantity of time.
+Explicit DataFrame to binary-block conversion allows algorithm performance to be measured separately
+from data conversion time.
+
+The SystemML binary-block matrix representation can be thought of as a two-dimensional array of blocks, where each block
+consists of a number of rows and columns. In this example, we specify a matrix consisting
+of blocks of size 1000x1000. The experimental `dataFrameToBinaryBlock()` method of `RDDConverterUtilsExt` is used
+to convert the `DataFrame df` to a SystemML binary-block matrix, which is represented by the datatype
+`JavaPairRDD[MatrixIndexes, MatrixBlock]`.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils}
+import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt=>RDDConverterUtils}
+
+scala> import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics
+
+scala> val numRowsPerBlock = 1000
+numRowsPerBlock: Int = 1000
+
+scala> val numColsPerBlock = 1000
+numColsPerBlock: Int = 1000
+
+scala> val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, numColsPerBlock)
+mc: org.apache.sysml.runtime.matrix.MatrixCharacteristics = [100000 x 1000, nnz=-1, blocks (1000 x 1000)]
+
+scala> val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false)
+sysMlMatrix: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock] = org.apache.spark.api.java.JavaPairRDD@2bce3248
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils}
+import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
+val numRowsPerBlock = 1000
+val numColsPerBlock = 1000
+val mc = new MatrixCharacteristics(numRows, numCols, numRowsPerBlock, numColsPerBlock)
+val sysMlMatrix = RDDConverterUtils.dataFrameToBinaryBlock(sc, df, mc, false)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## DML Script
+
+For this example, we will utilize the following DML Script called `shape.dml` that reads in a matrix and outputs the number of rows and the
+number of columns, each represented as a matrix.
+
+{% highlight r %}
+X = read($Xin)
+m = matrix(nrow(X), rows=1, cols=1)
+n = matrix(ncol(X), rows=1, cols=1)
+write(m, $Mout)
+write(n, $Nout)
+{% endhighlight %}
+
+
+## Execute Script
+
+Let's execute our DML script, as shown in the example below. The call to `reset()` of `MLContext` is not necessary here, but this method should
+be called if you need to reset inputs and outputs or if you would like to call `execute()` with a different script.
+
+An example of registering the `DataFrame df` as an input to the `X` variable is shown but commented out. If a DataFrame is registered directly,
+it will implicitly be converted to SystemML's binary-block format. However, since we've already explicitly converted the DataFrame to the
+binary-block fixed variable `systemMlMatrix`, we will register this input to the `X` variable. We register the `m` and `n` variables
+as outputs.
+
+When SystemML is executed via `DMLScript` (such as in Standalone Mode), inputs are supplied as either command-line named arguments
+or positional argument. These inputs are specified in DML scripts by prepending them with a `$`. Values are read from or written
+to files using `read`/`write` (DML) and `load`/`save` (PyDML) statements. When utilizing the `MLContext` API,
+inputs and outputs can be other data representations, such as `DataFrame`s. The input and output data are bound to DML variables.
+The named arguments in the `shape.dml` script do not have default values set for them, so we create a `Map` to map the required named
+arguments to blank `String`s so that the script can pass validation.
+
+The `shape.dml` script is executed by the call to `execute()`, where we supply the `Map` of required named arguments. The
+execution results are returned as the `MLOutput` fixed variable `outputs`. The number of rows is obtained by calling the `getStaticInt()`
+helper method with the `outputs` object and `"m"`. The number of columns is retrieved by calling `getStaticInt()` with
+`outputs` and `"n"`.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> ml.reset()
+
+scala> //ml.registerInput("X", df) // implicit conversion of DataFrame to binary-block
+
+scala> ml.registerInput("X", sysMlMatrix, numRows, numCols)
+
+scala> ml.registerOutput("m")
+
+scala> ml.registerOutput("n")
+
+scala> val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ")
+nargs: scala.collection.immutable.Map[String,String] = Map(Xin -> " ", Mout -> " ", Nout -> " ")
+
+scala> val outputs = ml.execute("shape.dml", nargs)
+15/10/12 16:29:15 WARN : Your hostname, derons-mbp.usca.ibm.com resolves to a loopback/non-reachable address: 127.0.0.1, but we couldn't find any external IP address!
+15/10/12 16:29:15 WARN OptimizerUtils: Auto-disable multi-threaded text read for 'text' and 'csv' due to thread contention on JRE < 1.8 (java.version=1.7.0_80).
+outputs: org.apache.sysml.api.MLOutput = org.apache.sysml.api.MLOutput@4d424743
+
+scala> val m = getScalarInt(outputs, "m")
+m: Int = 100000
+
+scala> val n = getScalarInt(outputs, "n")
+n: Int = 1000
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+ml.reset()
+//ml.registerInput("X", df) // implicit conversion of DataFrame to binary-block
+ml.registerInput("X", sysMlMatrix, numRows, numCols)
+ml.registerOutput("m")
+ml.registerOutput("n")
+val nargs = Map("Xin" -> " ", "Mout" -> " ", "Nout" -> " ")
+val outputs = ml.execute("shape.dml", nargs)
+val m = getScalarInt(outputs, "m")
+val n = getScalarInt(outputs, "n")
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## DML Script as String
+
+The `MLContext` API allows a DML script to be specified
+as a `String`. Here, we specify a DML script as a fixed `String` variable called `minMaxMeanScript`.
+This DML will find the minimum, maximum, and mean value of a matrix.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> val minMaxMeanScript: String =
+     | """
+     | Xin = read(" ")
+     | minOut = matrix(min(Xin), rows=1, cols=1)
+     | maxOut = matrix(max(Xin), rows=1, cols=1)
+     | meanOut = matrix(mean(Xin), rows=1, cols=1)
+     | write(minOut, " ")
+     | write(maxOut, " ")
+     | write(meanOut, " ")
+     | """
+minMaxMeanScript: String =
+"
+Xin = read(" ")
+minOut = matrix(min(Xin), rows=1, cols=1)
+maxOut = matrix(max(Xin), rows=1, cols=1)
+meanOut = matrix(mean(Xin), rows=1, cols=1)
+write(minOut, " ")
+write(maxOut, " ")
+write(meanOut, " ")
+"
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+val minMaxMeanScript: String =
+"""
+Xin = read(" ")
+minOut = matrix(min(Xin), rows=1, cols=1)
+maxOut = matrix(max(Xin), rows=1, cols=1)
+meanOut = matrix(mean(Xin), rows=1, cols=1)
+write(minOut, " ")
+write(maxOut, " ")
+write(meanOut, " ")
+"""
+
+{% endhighlight %}
+</div>
+
+</div>
+
+## Scala Wrapper for DML
+
+We can create a Scala wrapper for our invocation of the `minMaxMeanScript` DML `String`. The `minMaxMean()` method
+takes a `JavaPairRDD[MatrixIndexes, MatrixBlock]` parameter, which is a SystemML binary-block matrix representation.
+It also takes a `rows` parameter indicating the number of rows in the matrix, a `cols` parameter indicating the number
+of columns in the matrix, and an `MLContext` parameter. The `minMaxMean()` method
+returns a tuple consisting of the minimum value in the matrix, the maximum value in the matrix, and the computed
+mean value of the matrix.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> import org.apache.sysml.runtime.matrix.data.MatrixIndexes
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes
+
+scala> import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+
+scala> import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.api.java.JavaPairRDD
+
+scala> def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, cols: Int, ml: MLContext): (Double, Double, Double) = {
+     | ml.reset()
+     | ml.registerInput("Xin", mat, rows, cols)
+     | ml.registerOutput("minOut")
+     | ml.registerOutput("maxOut")
+     | ml.registerOutput("meanOut")
+     | val outputs = ml.executeScript(minMaxMeanScript)
+     | val minOut = getScalarDouble(outputs, "minOut")
+     | val maxOut = getScalarDouble(outputs, "maxOut")
+     | val meanOut = getScalarDouble(outputs, "meanOut")
+     | (minOut, maxOut, meanOut)
+     | }
+minMaxMean: (mat: org.apache.spark.api.java.JavaPairRDD[org.apache.sysml.runtime.matrix.data.MatrixIndexes,org.apache.sysml.runtime.matrix.data.MatrixBlock], rows: Int, cols: Int, ml: org.apache.sysml.api.MLContext)(Double, Double, Double)
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+import org.apache.sysml.runtime.matrix.data.MatrixIndexes
+import org.apache.sysml.runtime.matrix.data.MatrixBlock
+import org.apache.spark.api.java.JavaPairRDD
+def minMaxMean(mat: JavaPairRDD[MatrixIndexes, MatrixBlock], rows: Int, cols: Int, ml: MLContext): (Double, Double, Double) = {
+ml.reset()
+ml.registerInput("Xin", mat, rows, cols)
+ml.registerOutput("minOut")
+ml.registerOutput("maxOut")
+ml.registerOutput("meanOut")
+val outputs = ml.executeScript(minMaxMeanScript)
+val minOut = getScalarDouble(outputs, "minOut")
+val maxOut = getScalarDouble(outputs, "maxOut")
+val meanOut = getScalarDouble(outputs, "meanOut")
+(minOut, maxOut, meanOut)
+}
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
+## Invoking DML via Scala Wrapper
+
+Here, we invoke `minMaxMeanScript` using our `minMaxMean()` Scala wrapper method. It returns a tuple
+consisting of the minimum value in the matrix, the maximum value in the matrix, and the mean value of the matrix.
+
+<div class="codetabs">
+
+<div data-lang="Spark Shell" markdown="1">
+{% highlight scala %}
+scala> val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml)
+15/10/13 14:33:11 WARN OptimizerUtils: Auto-disable multi-threaded text read for 'text' and 'csv' due to thread contention on JRE < 1.8 (java.version=1.7.0_80).
+min: Double = 5.378949397005783E-9                                              
+max: Double = 0.9999999934660398
+mean: Double = 0.499988222338507
+
+{% endhighlight %}
+</div>
+
+<div data-lang="Statements" markdown="1">
+{% highlight scala %}
+val (min, max, mean) = minMaxMean(sysMlMatrix, numRows, numCols, ml)
+
+{% endhighlight %}
+</div>
+
+</div>
+
+
 * * *
 
 # Zeppelin Notebook Example - Linear Regression Algorithm - OLD API