You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ss...@apache.org on 2014/05/18 14:20:12 UTC

svn commit: r1595608 - /mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext

Author: ssc
Date: Sun May 18 12:20:12 2014
New Revision: 1595608

URL: http://svn.apache.org/r1595608
Log:
CMS commit to mahout by ssc

Modified:
    mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext

Modified: mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext
URL: http://svn.apache.org/viewvc/mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext?rev=1595608&r1=1595607&r2=1595608&view=diff
==============================================================================
--- mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext (original)
+++ mahout/site/mahout_cms/trunk/content/users/sparkbindings/play-with-shell.mdtext Sun May 18 12:20:12 2014
@@ -51,8 +51,7 @@ We'll use the shell to interactively pla
 
 Mahout's linear algebra DSL has an abstraction called *DistributedRowMatrix (DRM)* which models a matrix that is partitioned by rows and stored in the memory of a cluster of machines. We use ```dense()``` to create a dense in-core matrix from our toy dataset and use ```drmParallelize``` to load it into the cluster, "mimicking" a large, partitioned dataset.
 
-
-```
+<div class="codehilite"><pre>
 val drmData = drmParallelize(dense(
   (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
   (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
@@ -64,58 +63,58 @@ val drmData = drmParallelize(dense(
   (3, 2, 13,   7,  40.400208),  // Clusters
   (3, 3, 13,   4,  45.811716)), // Great Grains Pecan
   numPartitions = 2);
-```
+</pre></div>
 
 Have a look at this matrix. The first four columns represent the ingredients (our features) and the last column (the rating) is the target variable for our regression. [Linear regression](https://en.wikipedia.org/wiki/Linear_regression) assumes that the **target variable *y*** is generated by the linear combination of **the feature matrix *X*** with the **parameter vector *β*** plus the **noise *ε***, summarized in the formula ***y = Xβ + ε***. Our goal is to find an estimate of the parameter vector *β* that explains the data very well.
 
 As a first step, we extract *X* and *y* from our data matrix. We get *X* by slicing: we take all rows (denoted by ```::```) and the first four columns, which have the ingredients in milligrams as content. Note that the result is again a DRM. The shell will not execute this code yet, it saves the history of operations and defers the execution until we really access a result. **Mahout's DSL automatically optimizes and parallelizes all operations on DRMs and runs them on Apache Spark.**
 
-```
+<div class="codehilite"><pre>
 val drmX = drmData(::, 0 until 4)
-```
+</pre></div>
 
 Next, we extract the target variable vector *y*, the fifth column of the data matrix. We assume this one fits into our driver machine, so we fetch it in-core using ```collect```:
 
-```
+<div class="codehilite"><pre>
 val y = drmData.collect(::, 4)
-```
+</pre></div>
 
 Now we are ready to think about a mathematical way to estimate the parameter vector *β*. A simple textbook approach is [ordinary least squares (OLS)](https://en.wikipedia.org/wiki/Ordinary_least_squares), which minimizes the sum of residual squares. In OLS, there is even a closed form expression for estimating *ß* as ***(X<sup>T</sup>X)<sup>-1</sup> X<sup>T</sup>y***.
 
 The first thing which we compute for this is ***X<sup>T</sup>X***. The code for doing this in Mahout's scala DSL maps directly to the mathematical formula. The operation ```.t()``` transposes a matrix and analogous to R ```%*%``` denotes matrix multiplication.
 
-```
+<div class="codehilite"><pre>
 val drmXtX = drmX.t %*% drmX
-```
+</pre></div>
 
 The same is true for computing ***X<sup>T</sup>y***. We can simply type the math in scala expressions into the shell. Here, *X* lives in the cluster, while is *y* in the memory of the driver, and the result is a DRM again.
-```
+<div class="codehilite"><pre>
 val drmXty = drmX.t %*% y
-```
+</pre></div>
 
 We're nearly done. The next step we take is to fetch *X<sup>T</sup>X* and *X<sup>T</sup>y* into the memory of our driver machine (we are targeting features matrices that are tall and skinny , so we can assume that *X<sup>T</sup>X* is small enough to fit in). Then, we provide them to an in-core solver (Mahout provides the an analogon to R's ```solve()``` for that) which computes ```beta```, our OLS estimate of the parameter vector *β*.
 
-```
+<div class="codehilite"><pre>
 val XtX = drmXtX.collect
 val Xty = drmXty.collect(::, 0)
 
 val beta = solve(XtX, Xty)
-```
+</pre></div>
 
 That's it! We have a implemented a distributed linear regression algorithm on Apache Spark. I hope you agree that we didn't have to worry a lot about parallelization and distributed systems. The goal of Mahout's linear algebra DSL is to abstract away the ugliness of programming a distributed system as much as possible, while still retaining decent performance and scalability.
 
 We can now check how well our model fits its training data. First, we multiply the feature matrix *X* by our estimate of *β*. Then, we look at the difference (via L2-norm) of the target variable *y* to the fitted target variable:
 
-```
+<div class="codehilite"><pre>
 val yFitted = (drmX %*% beta).collect(::, 0)
 (y - yFitted).norm(2)
-```
+</pre></div>
 
 We hope that we could show that Mahout's shell allows people to interactively and incrementally write algorithms. We have entered a lot of individual commands, one-by-one, until we got the desired results. We can now refactor a little by wrapping our statements into easy-to-use functions. The definition of functions follows standard scala syntax. 
 
 We put all the commands for ordinary least squares into a function ```ols```. 
 
-```
+<div class="codehilite"><pre>
 def ols(drmX: DrmLike[Int], y: Vector) = {
 
   val XtX = (drmX.t %*% drmX).collect
@@ -123,22 +122,22 @@ def ols(drmX: DrmLike[Int], y: Vector) =
 
   solve(XtX, Xty)
 }
-```
+</pre></div>
 
 And we define a function ```goodnessOfFit``` that tells how well a model fits the target variable:
 
-```
+<div class="codehilite"><pre>
 def goodnessOfFit(drmX: DrmLike[Int], beta: Vector, y: Vector) = {
   val fittedY = (drmX %*% beta).collect(::, 0)
   (y - fittedY).norm(2)
 }
-```
+</pre></div>
 
 So far we have left out an important aspect of a standard linear regression model. Usually there is a constant bias term added to the model. Without that, our model always crosses through the origin and we only learn the right angle. An easy way to add such a bias term to our model is to add a column of ones to the feature matrix *X*. The corresponding weight in the parameter vector will then be the bias term.
 
 Mahout's DSL offers a ```mapBlock()``` method for custom modifications of a DRM. All the rows in a partition are merged to a block of the matrix which is given to custom code in a closure. For our example, we invoke ```mapBlock``` with ```ncol = drmX.ncol + 1``` to let the system know that change the number of columns of the matrix. The input to our closure is a ```block``` of the DRM and an array of ```keys``` for the rows contained in the block. In order to add a column, we first create a new block with an additional column, then copy the data from the current block into the new block and finally set the last column to ones and return the new block.
 
-```
+<div class="codehilite"><pre>
 val drmXwithBiasColumn = drmX.mapBlock(ncol = drmX.ncol + 1) {
   case(keys, block) =>
     // create a new block with an additional column
@@ -150,18 +149,18 @@ val drmXwithBiasColumn = drmX.mapBlock(n
 
     keys -> blockWithBiasColumn
 }
-```
+</pre></div>
+
 Now we can give the newly created DRM ```drmXwithBiasColumn``` to our model fitting method ```ols``` and see how well the resulting model fits the training data with ```goodnessOfFit```. You should see a large improvement in the result.
 
-```
+<div class="codehilite"><pre>
 val betaWithBiasTerm = ols(drmXwithBiasColumn, y)
 goodnessOfFit(drmXwithBiasColumn, betaWithBiasTerm, y)
-```
+</pre></div>
 
 As a further optimization, we can make use of the DSL's caching functionality. We use ```drmXwithBiasColumn``` repeatedly  as input to a computation, so it might be beneficial to cache it in memory. This is achieved by calling ```checkpoint()```. In the end, we remove it from the cache with uncache:
 
-
-```
+<div class="codehilite"><pre>
 val cachedDrmX = drmXwithBiasColumn.checkpoint()
 
 val betaWithBiasTerm = ols(cachedDrmX, y)
@@ -170,7 +169,7 @@ val goodness = goodnessOfFit(cachedDrmX,
 cachedDrmX.uncache()
 
 goodness
-```
+</pre></div>
 
 
 Liked what you saw? Checkout Mahout's overview for the [Scala and Spark bindings](https://mahout.apache.org/users/sparkbindings/home.html).