You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/03/17 23:45:26 UTC

[7/8] flink git commit: [FLINK-1697] [ml] Adds web documentation for alternating least squares. Adds web documentation for polynomial base feature mapper.

[FLINK-1697] [ml] Adds web documentation for alternating least squares. Adds web documentation for polynomial base feature mapper.

[ml] Adds comments

[ml] Set degree of parallelism of test suites to 2

[ml] Replaces FlatSpec tests with JUnit integration test cases in order to suppress the sysout output.

[ml] Adds missing clients-test jar

[docs] Sets jekyll's baseurl to http://ci.apache.org/projects/flink/flink-docs-master

[ml] Replaces JBlas by java netlib to avoid license issues of included fortran libraries

[ml] Adds com.github.fommil.netlib:core to license file

[ml] Adds Scala docs to FlinkTools

[ml] Adds comments to LabeledVector and the math package object

This closes #479.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e2d96f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e2d96f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e2d96f

Branch: refs/heads/master
Commit: 21e2d96f893e4460a8d85c501e31dc09ed2f0043
Parents: ff83c8c
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 10 15:41:40 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Mar 17 23:28:34 2015 +0100

----------------------------------------------------------------------
 LICENSE                                         |   1 -
 docs/_config.yml                                |   4 +-
 docs/build_docs.sh                              |   2 +-
 docs/ml/alternating_least_squares.md            | 131 +++++++++++++++-
 docs/ml/multiple_linear_regression.md           |  57 ++++---
 docs/ml/polynomial_base_feature_extractor.md    |  28 ----
 docs/ml/polynomial_base_feature_mapper.md       |  91 +++++++++++
 flink-clients/pom.xml                           |  11 ++
 flink-dist/src/main/flink-bin/LICENSE           |   1 +
 .../apache/flink/runtime/client/JobClient.scala |   2 +-
 flink-staging/flink-ml/pom.xml                  |  14 +-
 .../apache/flink/ml/common/ChainedLearner.scala |  13 ++
 .../flink/ml/common/ChainedTransformer.scala    |  12 ++
 .../org/apache/flink/ml/common/FlinkTools.scala |  88 +++++++++--
 .../apache/flink/ml/common/LabeledVector.scala  |   6 +
 .../org/apache/flink/ml/common/Learner.scala    |  14 +-
 .../apache/flink/ml/common/Transformer.scala    |  24 ++-
 .../flink/ml/feature/PolynomialBase.scala       |   4 +-
 .../org/apache/flink/ml/math/DenseVector.scala  |   9 ++
 .../scala/org/apache/flink/ml/math/JBlas.scala  |  70 ---------
 .../scala/org/apache/flink/ml/math/Vector.scala |   7 +
 .../org/apache/flink/ml/math/package.scala      |   9 +-
 .../apache/flink/ml/recommendation/ALS.scala    |  98 ++++++------
 .../regression/MultipleLinearRegression.scala   |  79 ++++++----
 .../flink/ml/feature/PolynomialBaseITCase.scala | 132 ++++++++++++++++
 .../flink/ml/feature/PolynomialBaseSuite.scala  | 118 --------------
 .../flink/ml/recommendation/ALSITCase.scala     | 152 +++++++++++++++++++
 .../flink/ml/recommendation/ALSSuite.scala      | 141 -----------------
 .../MultipleLinearRegressionITCase.scala        | 115 ++++++++++++++
 .../MultipleLinearRegressionSuite.scala         | 100 ------------
 .../flink/ml/regression/RegressionData.scala    |   3 +-
 31 files changed, 956 insertions(+), 580 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index e58d1a5..85d0d85 100644
--- a/LICENSE
+++ b/LICENSE
@@ -228,7 +228,6 @@ The Apache Flink project bundles the following files under the MIT License:
  - normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal
  - Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy
  - D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt
- - scopt (http://github.com/scopt/scopt)
 
 All rights reserved.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index 612aa6f..d7cf349 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -30,7 +30,7 @@ FLINK_SCALA_VERSION_SHORT: "2.10"
 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
 FLINK_GITHUB_URL:  https://github.com/apache/flink
 
-FLINK_WEBSITE_URL: http://flink.apache.org/
+FLINK_WEBSITE_URL: http://flink.apache.org
 FLINK_DOWNLOAD_URL: http://flink.apache.org/downloads.html
 
 FLINK_DOWNLOAD_URL_HADOOP1_STABLE: http://www.apache.org/dyn/closer.cgi/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz
@@ -59,3 +59,5 @@ kramdown:
     toc_levels: 1..3
 
 host: localhost
+
+baseurl: http://ci.apache.org/projects/flink/flink-docs-master

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/build_docs.sh
----------------------------------------------------------------------
diff --git a/docs/build_docs.sh b/docs/build_docs.sh
index 4f8a7c9..b65f7c9 100755
--- a/docs/build_docs.sh
+++ b/docs/build_docs.sh
@@ -54,7 +54,7 @@ JEKYLL_CMD="build"
 while getopts ":p" opt; do
 	case $opt in
 		p)
-		JEKYLL_CMD="serve --watch"
+		JEKYLL_CMD="serve --baseurl "" --watch"
 		;;
 	esac
 done

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/alternating_least_squares.md
----------------------------------------------------------------------
diff --git a/docs/ml/alternating_least_squares.md b/docs/ml/alternating_least_squares.md
index bf97b1b..7a4a5d5 100644
--- a/docs/ml/alternating_least_squares.md
+++ b/docs/ml/alternating_least_squares.md
@@ -1,4 +1,5 @@
 ---
+mathjax: include
 title: Alternating Least Squares
 ---
 <!--
@@ -25,4 +26,132 @@ under the License.
 
 ## Description
 
-## Parameters
\ No newline at end of file
+The alternating least squares (ALS) algorithm factorizes a given matrix $R$ into two factors $U$ and $V$ such that $R \approx U^TV$.
+The unknown row dimension is given as a parameter to the algorithm and is called latent factors.
+Since matrix factorization can be used in the context of recommendation, the matrices $U$ and $V$ can be called user and item matrix, respectively.
+The $i$th column of the user matrix is denoted by $u_i$ and the $i$th column of the item matrix is $v_i$.
+The matrix $R$ can be called the ratings matrix with $$(R)_{i,j} = r_{i,j}$$.
+
+In order to find the user and item matrix, the following problem is solved:
+
+$$\arg\min_{U,V} \sum_{\{i,j\mid r_{i,j} \not= 0\}} \left(r_{i,j} - u_{i}^Tv_{j}\right)^2 + 
+\lambda \left(\sum_{i} n_{u_i} \left\lVert u_i \right\rVert^2 + \sum_{j} n_{v_j} \left\lVert v_j \right\rVert^2 \right)$$
+
+with $\lambda$ being the regularization factor, $$n_{u_i}$$ being the number of items the user $i$ has rated and $$n_{v_j}$$ being the number of times the item $j$ has been rated.
+This regularization scheme to avoid overfitting is called weighted-$\lambda$-regularization.
+Details can be found in the work of [Zhou et al.](http://dx.doi.org/10.1007/978-3-540-68880-8_32).
+
+By fixing one of the matrices $U$ or $V$, we obtain a quadratic form which can be solved directly.
+The solution of the modified problem is guaranteed to monotonically decrease the overall cost function.
+By applying this step alternately to the matrices $U$ and $V$, we can iteratively improve the matrix factorization.
+
+The matrix $R$ is given in its sparse representation as a tuple of $(i, j, r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix value at position $(i,j)$.
+
+
+## Parameters
+
+The alternating least squares implementation can be controlled by the following parameters:
+
+   <table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Parameters</th>
+        <th class="text-center">Description</th>
+      </tr>
+    </thead>
+
+    <tbody>
+      <tr>
+        <td><strong>NumFactors</strong></td>
+        <td>
+          <p>
+            The number of latent factors to use for the underlying model.
+            It is equivalent to the dimension of the calculated user and item vectors.
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Lambda</strong></td>
+        <td>
+          <p>
+            Regularization factor. Tune this value in order to avoid overfitting or poor performance due to strong generalization.
+            (Default value: <strong>1</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Iterations</strong></td>
+        <td>
+          <p>
+            The maximum number of iterations.
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Blocks</strong></td>
+        <td>
+          <p>
+            The number of blocks into which the user and item matrix are grouped.
+            The fewer blocks one uses, the less data is sent redundantly. 
+            However, bigger blocks entail bigger update messages which have to be stored on the heap. 
+            If the algorithm fails because of an OutOfMemoryException, then try to increase the number of blocks. 
+            (Default value: '''None''')
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Seed</strong></td>
+        <td>
+          <p>
+            Random seed used to generate the initial item matrix for the algorithm.
+            (Default value: <strong>0</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>TemporaryPath</strong></td>
+        <td>
+          <p>
+            Path to a temporary directory into which intermediate results are stored. 
+            If this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration  and a post-processing step which calculates a last ALS half-step. 
+            The preprocessing steps calculate the <code>OutBlockInformation</code> and <code>InBlockInformation</code> for the given rating matrix. 
+            The results of the individual steps are stored in the specified directory.
+            By splitting the algorithm into multiple smaller steps, Flink does not have to split the available memory amongst too many operators. 
+            This allows the system to process bigger individual messages and improves the overall performance.
+            (Default value: <strong>None</strong>)
+          </p>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+
+## Examples
+
+{% highlight scala %}
+// Read input data set from a csv file
+val inputDS: DataSet[(Int, Int, Double)] = env.readCsvFile[(Int, Int, Double)](
+  pathToTrainingFile)
+
+// Setup the ALS learner
+val als = ALS()
+.setIterations(10)
+.setNumFactors(10)
+.setBlocks(100)
+.setTemporaryPath("hdfs://tempPath")
+
+// Set the other parameters via a parameter map
+val parameters = ParameterMap()
+.add(ALS.Lambda, 0.9)
+.add(ALS.Seed, 42)
+
+// Calculate the factorization
+val factorization = als.fit(inputDS, parameters)
+
+// Read the testing data set from a csv file
+val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
+
+// Calculate the ratings according to the matrix factorization
+val predictedRatings = factorization.transform(testingDS)
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/multiple_linear_regression.md
----------------------------------------------------------------------
diff --git a/docs/ml/multiple_linear_regression.md b/docs/ml/multiple_linear_regression.md
index e98ccc0..840e899 100644
--- a/docs/ml/multiple_linear_regression.md
+++ b/docs/ml/multiple_linear_regression.md
@@ -62,21 +62,46 @@ under the License.
 ## Parameters
 
   The multiple linear regression implementation can be controlled by the following parameters:
-
-Iterations
-: The maximum number of iterations.
-(Default value: **10**)
-
-Stepsize
-: Initial step size for the gradient descent method.
-This value controls how far the gradient descent method moves in the opposite direction of the gradient.
-Tuning this parameter might be crucial to make it stable and to obtain a better performance.
-(Default value: **0.1**)
-
-ConvergenceThreshold
-: Threshold for relative change of the sum of squared residuals until the iteration is stopped.
-(Default value: **None**)
-
+  
+   <table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Parameters</th>
+        <th class="text-center">Description</th>
+      </tr>
+    </thead>
+
+    <tbody>
+      <tr>
+        <td><strong>Iterations</strong></td>
+        <td>
+          <p>
+            The maximum number of iterations. (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Stepsize</strong></td>
+        <td>
+          <p>
+            Initial step size for the gradient descent method.
+            This value controls how far the gradient descent method moves in the opposite direction of the gradient.
+            Tuning this parameter might be crucial to make it stable and to obtain a better performance. 
+            (Default value: <strong>0.1</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>ConvergenceThreshold</strong></td>
+        <td>
+          <p>
+            Threshold for relative change of the sum of squared residuals until the iteration is stopped.
+            (Default value: <strong>None</strong>)
+          </p>
+        </td>
+      </tr>
+    </tbody>
+  </table>
 
 ## Examples
 
@@ -97,5 +122,3 @@ val model = mlr.fit(trainingDS)
 // Calculate the predictions for the test data
 val predictions = model.transform(testingDS)
 {% endhighlight %}
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/polynomial_base_feature_extractor.md
----------------------------------------------------------------------
diff --git a/docs/ml/polynomial_base_feature_extractor.md b/docs/ml/polynomial_base_feature_extractor.md
deleted file mode 100644
index ececec3..0000000
--- a/docs/ml/polynomial_base_feature_extractor.md
+++ /dev/null
@@ -1,28 +0,0 @@
----
-title: Polynomial Base Feature Extractor
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-## Description
-
-## Parameters
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/polynomial_base_feature_mapper.md
----------------------------------------------------------------------
diff --git a/docs/ml/polynomial_base_feature_mapper.md b/docs/ml/polynomial_base_feature_mapper.md
new file mode 100644
index 0000000..2964f04
--- /dev/null
+++ b/docs/ml/polynomial_base_feature_mapper.md
@@ -0,0 +1,91 @@
+---
+mathjax: include
+title: Polynomial Base Feature Mapper
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+The polynomial base feature mapper maps a vector into the polynomial feature space of degree $d$.
+The dimension of the input vector determines the number of polynomial factors whose values are the respective vector entries.
+Given a vector $(x, y, z, \ldots)^T$ the resulting feature vector looks like:
+
+$$\left(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xy^2, xyz, xz^2, y^3, \ldots\right)^T$$
+
+Flink's implementation orders the polynomials in decreasing order of their degree.
+
+Given the vector $\left(3,2\right)^T$, the polynomial base feature vector of degree 3 would look like
+ 
+ $$\left(3^3, 3^2\cdot2, 3\cdot2^2, 2^3, 3^2, 3\cdot2, 2^2, 3, 2\right)^T$$
+
+This transformer can be prepended to all `Transformer` and `Learner` implementations which expect an input of type `LabeledVector`.
+
+## Parameters
+
+The polynomial base feature mapper can be controlled by the following parameters:
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Parameters</th>
+        <th class="text-center">Description</th>
+      </tr>
+    </thead>
+
+    <tbody>
+      <tr>
+        <td><strong>Degree</strong></td>
+        <td>
+          <p>
+            The maximum polynomial degree. 
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+
+## Examples
+
+{% highlight scala %}
+// Obtain the training data set
+val trainingDS: DataSet[LabeledVector] = ...
+
+// Setup polynomial base feature extractor of degree 3
+val polyBase = PolynomialBase()
+.setDegree(3)
+
+// Setup the multiple linear regression learner
+val mlr = MultipleLinearRegression()
+
+// Control the learner via the parameter map
+val parameters = ParameterMap()
+.add(MultipleLinearRegression.Iterations, 20)
+.add(MultipleLinearRegression.Stepsize, 0.5)
+
+// Create pipeline PolynomialBase -> MultipleLinearRegression
+val chained = polyBase.chain(mlr)
+
+// Learn the model
+val model = chained.fit(trainingDS)
+{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index d7dccad..95d17d7 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -116,6 +116,17 @@ under the License.
 	<build>
 		<plugins>
 			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
 				<artifactId>maven-assembly-plugin</artifactId>
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 				<executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index 89d8eca..d0b7fb4 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -302,6 +302,7 @@ The Apache Flink project bundles the following components under
 BSD-style licenses:
  
 [3-clause BSD license]
+ - core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
  - Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
  - D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
  - LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp.

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 19b3050..f1c6450 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.{JobID, JobGraph}
+import org.apache.flink.runtime.jobgraph.JobGraph
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
 import org.apache.flink.runtime.messages.JobManagerMessages._

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml
index 53188e1..24ba591 100644
--- a/flink-staging/flink-ml/pom.xml
+++ b/flink-staging/flink-ml/pom.xml
@@ -46,9 +46,9 @@
 		</dependency>
 
 		<dependency>
-			<groupId>org.jblas</groupId>
-			<artifactId>jblas</artifactId>
-			<version>1.2.3</version>
+			<groupId>com.github.fommil.netlib</groupId>
+			<artifactId>core</artifactId>
+			<version>1.1.2</version>
 		</dependency>
 
 		<dependency>
@@ -57,6 +57,14 @@
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-clients</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
index cd0f403..b1a0a2f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
@@ -20,6 +20,19 @@ package org.apache.flink.ml.common
 
 import org.apache.flink.api.scala.DataSet
 
+/** This class represents a [[org.apache.flink.ml.common.Learner]] which is chained to a
+  * [[Transformer]].
+  *
+  * Calling the method `fit` on this object will pipe the input data through the given
+  * [[Transformer]], whose output is fed to the [[Learner]].
+  *
+  * @param head Preceding [[Transformer]] pipeline
+  * @param tail [[Learner]] instance
+  * @tparam IN Type of the training data
+  * @tparam TEMP Type of the produced data by the transformer pipeline and input type to the
+  *              [[Learner]]
+  * @tparam OUT Type of the trained model
+  */
 class ChainedLearner[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
                                     val tail: Learner[TEMP, OUT])
   extends Learner[IN, OUT] {

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
index 9a262cb..3f108bf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
@@ -20,6 +20,18 @@ package org.apache.flink.ml.common
 
 import org.apache.flink.api.scala.DataSet
 
+/** This class represents a chain of multiple [[Transformer]].
+  *
+  * Calling the method `transform` on this object will first apply the preceding [[Transformer]] to
+  * the input data. The resulting output data is then fed to the succeeding [[Transformer]].
+  *
+  * @param head Preceding [[Transformer]]
+  * @param tail Succeeding [[Transformer]]
+  * @tparam IN Type of incoming elements
+  * @tparam TEMP Type of output elements of the preceding [[Transformer]] and input type of
+  *              succeeding [[Transformer]]
+  * @tparam OUT Type of outgoing elements
+  */
 class ChainedTransformer[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
                                         val tail: Transformer[TEMP, OUT])
   extends Transformer[IN, OUT] {

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
index 2b12f30..22bbe82 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
@@ -27,18 +27,24 @@ import org.apache.flink.core.fs.Path
 
 import scala.reflect.ClassTag
 
-/**
- * Collection of convenience functions
- */
+/** FlinkTools contains a set of convenience functions for Flink's machine learning library:
+  *
+  *  - persist:
+  *  Takes up to 5 [[DataSet]]s and file paths. Each [[DataSet]] is written to the specified
+  *  path and subsequently re-read from disk. This method can be used to effectively split the
+  *  execution graph at the given [[DataSet]]. Writing it to disk triggers its materialization
+  *  and specifying it as a source will prevent the re-execution of it.
+  */
 object FlinkTools {
 
-  /**
-   *
-   * @param dataset
-   * @param path
-   * @tparam T
-   * @return
-   */
+  /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent
+    * operations.
+    *
+    * @param dataset [[DataSet]] to write to disk
+    * @param path File path to write dataset to
+    * @tparam T Type of the [[DataSet]] elements
+    * @return [[DataSet]] reading the just written file
+    */
   def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
     val env = dataset.getExecutionEnvironment
     val outputFormat = new TypeSerializerOutputFormat[T]
@@ -57,6 +63,17 @@ object FlinkTools {
     env.createInput(inputFormat)
   }
 
+  /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+    * subsequent operations.
+    *
+    * @param ds1 First [[DataSet]] to write to disk
+    * @param ds2 Second [[DataSet]] to write to disk
+    * @param path1 Path for ds1
+    * @param path2 Path for ds2
+    * @tparam A Type of the first [[DataSet]]'s elements
+    * @tparam B Type of the second [[DataSet]]'s elements
+    * @return Tuple of [[DataSet]]s reading the just written files
+    */
   def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation](ds1: DataSet[A], ds2:
   DataSet[B], path1: String, path2: String):(DataSet[A], DataSet[B])  = {
     val env = ds1.getExecutionEnvironment
@@ -88,6 +105,20 @@ object FlinkTools {
     (env.createInput(if1), env.createInput(if2))
   }
 
+  /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+    * subsequent operations.
+    *
+    * @param ds1 First [[DataSet]] to write to disk
+    * @param ds2 Second [[DataSet]] to write to disk
+    * @param ds3 Third [[DataSet]] to write to disk
+    * @param path1 Path for ds1
+    * @param path2 Path for ds2
+    * @param path3 Path for ds3
+    * @tparam A Type of first [[DataSet]]'s elements
+    * @tparam B Type of second [[DataSet]]'s elements
+    * @tparam C Type of third [[DataSet]]'s elements
+    * @return Tuple of [[DataSet]]s reading the just written files
+    */
   def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
   C: ClassTag: TypeInformation](ds1: DataSet[A], ds2:  DataSet[B], ds3: DataSet[C], path1:
   String, path2: String, path3: String): (DataSet[A], DataSet[B], DataSet[C])  = {
@@ -131,6 +162,23 @@ object FlinkTools {
     (env.createInput(if1), env.createInput(if2), env.createInput(if3))
   }
 
+  /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+    * subsequent operations.
+    *
+    * @param ds1 First [[DataSet]] to write to disk
+    * @param ds2 Second [[DataSet]] to write to disk
+    * @param ds3 Third [[DataSet]] to write to disk
+    * @param ds4 Fourth [[DataSet]] to write to disk
+    * @param path1 Path for ds1
+    * @param path2 Path for ds2
+    * @param path3 Path for ds3
+    * @param path4 Path for ds4
+    * @tparam A Type of first [[DataSet]]'s elements
+    * @tparam B Type of second [[DataSet]]'s elements
+    * @tparam C Type of third [[DataSet]]'s elements
+    * @tparam D Type of fourth [[DataSet]]'s elements
+    * @return Tuple of [[DataSet]]s reading the just written files
+    */
   def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
   C: ClassTag: TypeInformation, D: ClassTag: TypeInformation](ds1: DataSet[A], ds2:  DataSet[B],
                                                               ds3: DataSet[C], ds4: DataSet[D],
@@ -188,6 +236,26 @@ object FlinkTools {
     (env.createInput(if1), env.createInput(if2), env.createInput(if3), env.createInput(if4))
   }
 
+  /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+    * subsequent operations.
+    *
+    * @param ds1 First [[DataSet]] to write to disk
+    * @param ds2 Second [[DataSet]] to write to disk
+    * @param ds3 Third [[DataSet]] to write to disk
+    * @param ds4 Fourth [[DataSet]] to write to disk
+    * @param ds5 Fifth [[DataSet]] to write to disk
+    * @param path1 Path for ds1
+    * @param path2 Path for ds2
+    * @param path3 Path for ds3
+    * @param path4 Path for ds4
+    * @param path5 Path for ds5
+    * @tparam A Type of first [[DataSet]]'s elements
+    * @tparam B Type of second [[DataSet]]'s elements
+    * @tparam C Type of third [[DataSet]]'s elements
+    * @tparam D Type of fourth [[DataSet]]'s elements
+    * @tparam E Type of fifth [[DataSet]]'s elements
+    * @return Tuple of [[DataSet]]s reading the just written files
+    */
   def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
   C: ClassTag: TypeInformation, D: ClassTag: TypeInformation, E: ClassTag: TypeInformation]
   (ds1: DataSet[A], ds2:  DataSet[B], ds3: DataSet[C], ds4: DataSet[D], ds5: DataSet[E], path1:

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
index 3c4a257..f3d6172 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
@@ -20,4 +20,10 @@ package org.apache.flink.ml.common
 
 import org.apache.flink.ml.math.Vector
 
+/** This class represents a vector with an associated label as it is required for many supervised
+  * learning tasks.
+  *
+  * @param vector Data point
+  * @param label Label of the data point
+  */
 case class LabeledVector(vector: Vector, label: Double) {}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
index 0d56dc8..c8082c7 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
@@ -19,8 +19,20 @@
 package org.apache.flink.ml.common
 
 import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.WithParameters
 
+/** Base trait for an algorithm which trains a model based on some training data
+  *
+  * The idea is that all algorithms which train a model implement this trait. That way
+  * they can be chained with [[Transformer]] which act as a preprocessing step for the actual
+  * learning. In that sense, [[Learner]] denote the end of a pipeline and cannot be further
+  * chained.
+  *
+  * Every learner has to implement the `fit` method which takes the training data and learns
+  * a model from the data.
+  *
+  * @tparam IN Type of the training data
+  * @tparam OUT Type of the trained model
+  */
 trait Learner[IN, OUT] extends WithParameters {
   def fit(input: DataSet[IN], parameters: ParameterMap = ParameterMap.Empty): OUT
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
index 76abc62..02d63cf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
@@ -20,14 +20,24 @@ package org.apache.flink.ml.common
 
 import org.apache.flink.api.scala.DataSet
 
-/**
- * A transformer represents
- *
- * @tparam IN Type of incoming elements
- * @tparam OUT Type of outgoing elements
- */
+/** Base trait for an algorithm which transforms the input data to some output data.
+  *
+  * A [[Transformer]] is used to transform input data to some output data. Transformations might
+  * be feature extractors, feature mappings, whitening or centralization just to name a few.
+  *
+  * [[Transformer]] can be chained with other [[Transformer]] creating a [[ChainedTransformer]],
+  * which again can be chained. Chaining a [[Transformer]] with a [[Learner]] creates a
+  * [[ChainedLearner]] which terminates a pipeline.
+  *
+  * A [[Transformer]] implementation has to implement the method `transform`, which defines how
+  * the input data is transformed into the output data.
+  *
+  * @tparam IN Type of incoming elements
+  * @tparam OUT Type of outgoing elements
+  */
 trait Transformer[IN, OUT] extends WithParameters {
-  def chain[CHAINED](transformer: Transformer[OUT, CHAINED]): ChainedTransformer[IN, OUT, CHAINED] = {
+  def chain[CHAINED](transformer: Transformer[OUT, CHAINED]):
+  ChainedTransformer[IN, OUT, CHAINED] = {
     new ChainedTransformer[IN, OUT, CHAINED](this, transformer)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
index 632ded6..04f698e 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
@@ -28,13 +28,13 @@ import org.apache.flink.api.scala._
 /** Maps a vector into the polynomial feature space.
   *
   * This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the
-  * polynomial feature space of degree `n`. That is to say, it calculates the following
+  * polynomial feature space of degree `d`. That is to say, it calculates the following
   * representation:
   *
   * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.commonLearner]] implementations which expect an input of
+  * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of
   * [[LabeledVector]].
   *
   * @example

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
index 8e0eed0..d407a70 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
@@ -60,6 +60,15 @@ case class DenseVector(val values: Array[Double]) extends Vector {
       case _ => false
     }
   }
+
+  /**
+   * Copies the vector instance
+   *
+   * @return Copy of the vector instance
+   */
+  override def copy: Vector = {
+    DenseVector(values.clone())
+  }
 }
 
 object DenseVector {

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
deleted file mode 100644
index 5d6eca4..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import org.jblas.DoubleMatrix
-
-/**
- * Convenience functions for the interaction with JBlas. If you want to use JBlas and allow an
- * easy transition from Flink's matrix abstraction to JBlas's and vice versa, simply import
- * all elements contained in the JBlas object.
- */
-object JBlas {
-
-  /**
-   * Implicit conversion from Flink's [[DenseMatrix]] to JBlas's [[DoubleMatrix]]
-   *
-   * @param matrix DenseMatrix to be converted
-   * @return DoubleMatrix resulting from the given matrix
-   */
-  implicit def denseMatrix2JBlas(matrix: DenseMatrix): DoubleMatrix = {
-    new DoubleMatrix(matrix.numRows, matrix.numCols, matrix.values: _*)
-  }
-
-  /**
-   * Implicit class to extends [[DoubleMatrix]] such that Flink's [[DenseMatrix]] and
-   * [[DenseVector]] can easily be retrieved from.
-   * @param matrix
-   */
-  implicit class RichDoubleMatrix(matrix: DoubleMatrix) {
-    def fromJBlas: DenseMatrix = DenseMatrix(matrix.rows, matrix.columns, matrix.data)
-
-    def fromJBlas2Vector: DenseVector = {
-      require(matrix.columns == 1, "The JBlas matrix contains more than 1 column.")
-
-      DenseVector(matrix.data)
-    }
-  }
-
-  /**
-   * Implicit conversion from Flink's [[Vector]] to JBlas's [[DoubleMatrix]]
-   *
-   * @param vector Vector to be converted
-   * @return DoubleMatrix resulting from the given vector
-   */
-  implicit def vector2JBlas(vector: Vector): DoubleMatrix = {
-    vector match {
-      case x: DenseVector => denseVector2JBlas(x)
-    }
-  }
-
-  private def denseVector2JBlas(vector: DenseVector): DoubleMatrix = {
-    new DoubleMatrix(vector.size, 1, vector.values: _*)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
index ddda003..20d820c 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
@@ -36,4 +36,11 @@ trait Vector {
    * @return element with index
    */
   def apply(index: Int): Double
+
+  /**
+   * Copies the vector instance
+   *
+   * @return Copy of the vector instance
+   */
+  def copy: Vector
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
index fce008a..e82e38f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
@@ -19,7 +19,8 @@
 package org.apache.flink.ml
 
 /**
- * Convenience to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]] abstraction.
+ * Convenience methods to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]]
+ * abstraction.
  */
 package object math {
   implicit class RichMatrix(matrix: Matrix) extends Iterable[Double] {
@@ -38,4 +39,10 @@ package object math {
       }
     }
   }
+
+  implicit def vector2Array(vector: Vector): Array[Double] = {
+    vector match {
+      case dense: DenseVector => dense.values
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 1051ae5..5ff59d1 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -13,11 +13,12 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.flink.ml.recommendation
 
-import java.lang
+import java.{util, lang}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.operators.Order
@@ -28,7 +29,9 @@ import org.apache.flink.types.Value
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction}
 
-import org.jblas.{Solve, SimpleBlas, DoubleMatrix}
+import com.github.fommil.netlib.BLAS.{ getInstance => blas }
+import com.github.fommil.netlib.LAPACK.{ getInstance => lapack }
+import org.netlib.util.intW
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -43,21 +46,23 @@ import scala.util.Random
   * column of the item matrix is `v_i`. The matrix `R` is called the ratings matrix and
   * `(R)_{i,j} = r_{i,j}`.
   *
-  * In order to find the user and item matrix the following problem is solved:
+  * In order to find the user and item matrix, the following problem is solved:
   *
   * `argmin_{U,V} sum_(i,j\ with\ r_{i,j} != 0) (r_{i,j} - u_{i}^Tv_{j})^2 +
   * \lambda (sum_(i) n_{u_i} ||u_i||^2 + sum_(j) n_{v_j} ||v_j||^2)`
   *
-  * Overfitting is avoided by using a weighted-lambda-regularization scheme.
+  * with `\lambda` being the regularization factor, `n_{u_i}` being the number of items the user `i`
+  * has rated and `n_{v_j}` being the number of times the item `j` has been rated. This
+  * regularization scheme to avoid overfitting is called weighted-lambda-regularization. Details
+  * can be found in the work of [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]].
   *
   * By fixing one of the matrices `U` or `V` one obtains a quadratic form which can be solved. The
   * solution of the modified problem is guaranteed to decrease the overall cost function. By
   * applying this step alternately to the matrices `U` and `V`, we can iteratively improve the
-  * overall solution. Details can be found in the work of
-  * [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]].
+  * matrix factorization.
   *
   * The matrix `R` is given in its sparse representation as a tuple of `(i, j, r)` where `i` is the
-  * row index, `j` is the column index and `r` is the matrix a position `(i,j)`.
+  * row index, `j` is the column index and `r` is the matrix value at position `(i,j)`.
   *
   * @example
   *          {{{
@@ -68,7 +73,7 @@ import scala.util.Random
   *               .setIterations(10)
   *               .setNumFactors(10)
   *
-  *             val model = als.fit(inputDS))
+  *             val model = als.fit(inputDS)
   *
   *             val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
   *
@@ -79,20 +84,23 @@ import scala.util.Random
   *
   *  - [[ALS.NumFactors]]:
   *  The number of latent factors. It is the dimension of the calculated user and item vectors.
+  *  (Default value: '''10''')
   *
   *  - [[ALS.Lambda]]:
   *  Regularization factor. Tune this value in order to avoid overfitting/generalization.
+  *  (Default value: '''1''')
   *
-  *  - [[ALS.Iterations]]: The number of iterations to perform.
+  *  - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''')
   *
   *  - [[ALS.Blocks]]:
   *  The number of blocks into which the user and item matrix a grouped. The fewer
   *  blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger
   *  update messages which have to be stored on the Heap. If the algorithm fails because of
-  *  an OutOfMemoryException, then try to increase the number of blocks.
+  *  an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''')
   *
   *  - [[ALS.Seed]]:
-  *  Random seed used to generate the initial item matrix for the algorithm
+  *  Random seed used to generate the initial item matrix for the algorithm.
+  *  (Default value: '''0''')
   *
   *  - [[ALS.TemporaryPath]]:
   *  Path to a temporary directory into which intermediate results are stored. If
@@ -103,7 +111,7 @@ import scala.util.Random
   *  the individual steps are stored in the specified directory. By splitting the algorithm
   *  into multiple smaller steps, Flink does not have to split the available memory amongst too many
   *  operators. This allows the system to process bigger individual messasges and improves the
-  *  overall performance.
+  *  overall performance. (Default value: '''None''')
   *
   * The ALS implementation is based on Spark's MLLib implementation of ALS which you can find
   * [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/
@@ -318,10 +326,10 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
 
         // in order to save space, store only the upper triangle of the XtX matrix
         val triangleSize = (factors*factors - factors)/2 + factors
-        val matrix = DoubleMatrix.zeros(triangleSize)
-        val fullMatrix = DoubleMatrix.zeros(factors, factors)
-        val userXtX = new ArrayBuffer[DoubleMatrix]()
-        val userXy = new ArrayBuffer[DoubleMatrix]()
+        val matrix = Array.fill(triangleSize)(0.0)
+        val fullMatrix = Array.fill(factors * factors)(0.0)
+        val userXtX = new ArrayBuffer[Array[Double]]()
+        val userXy = new ArrayBuffer[Array[Double]]()
         val numRatings = new ArrayBuffer[Int]()
 
         override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])],
@@ -341,8 +349,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
             val oldLength = userXtX.length
 
             while(i < (numUsers - oldLength)) {
-              userXtX += DoubleMatrix.zeros(triangleSize)
-              userXy += DoubleMatrix.zeros(factors)
+              userXtX += Array.fill(triangleSize)(0.0)
+              userXy += Array.fill(factors)(0.0)
               numRatings.+=(0)
 
               i += 1
@@ -356,8 +364,9 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
           i = 0
           while(i  < matricesToClear){
             numRatings(i) = 0
-            userXtX(i).fill(0.0f)
-            userXy(i).fill(0.0f)
+
+            util.Arrays.fill(userXtX(i), 0.0)
+            util.Arrays.fill(userXy(i), 0.0)
 
             i += 1
           }
@@ -372,7 +381,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
 
             var p = 0
             while(p < blockFactors.length){
-              val vector = new DoubleMatrix(blockFactors(p))
+              val vector = blockFactors(p)
+
               outerProduct(vector, matrix, factors)
 
               val (users, ratings) = inInfo.ratingsForBlock(itemBlock)(p)
@@ -380,8 +390,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
               var i = 0
               while (i < users.length) {
                 numRatings(users(i)) += 1
-                userXtX(users(i)).addi(matrix)
-                SimpleBlas.axpy(ratings(i), vector, userXy(users(i)))
+                blas.daxpy(matrix.length, 1, matrix, 1, userXtX(users(i)), 1)
+                blas.daxpy(vector.length, ratings(i), vector, 1, userXy(users(i)), 1)
 
                 i += 1
               }
@@ -401,12 +411,14 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
 
             // add regularization constant
             while(f < factors){
-              fullMatrix.data(f*factors + f) += lambda * numRatings(i)
+              fullMatrix(f*factors + f) += lambda * numRatings(i)
               f += 1
             }
 
             // calculate new user vector
-            array(i) = Solve.solvePositive(fullMatrix, userXy(i)).data
+            val result = new intW(0)
+            lapack.dposv("U", factors, 1, fullMatrix, factors , userXy(i), factors, result)
+            array(i) = userXy(i)
 
             i += 1
           }
@@ -696,16 +708,13 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
 
   // ================================ Math helper functions ========================================
 
-  def outerProduct(vector: DoubleMatrix, matrix: DoubleMatrix, factors: Int): Unit = {
-    val vd =  vector.data
-    val md = matrix.data
-
+  def outerProduct(vector: Array[Double], matrix: Array[Double], factors: Int): Unit = {
     var row = 0
     var pos = 0
     while(row < factors){
       var col = 0
       while(col <= row){
-        md(pos) = vd(row) * vd(col)
+        matrix(pos) = vector(row) * vector(col)
         col += 1
         pos += 1
       }
@@ -714,24 +723,22 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
     }
   }
 
-  def generateFullMatrix(triangularMatrix: DoubleMatrix, fullMatrix: DoubleMatrix,
+  def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double],
                          factors: Int): Unit = {
     var row = 0
     var pos = 0
-    val fmd = fullMatrix.data
-    val tmd = triangularMatrix.data
 
     while(row < factors){
       var col = 0
       while(col < row){
-        fmd(row*factors + col) = tmd(pos)
-        fmd(col*factors + row) = tmd(pos)
+        fullMatrix(row*factors + col) = triangularMatrix(pos)
+        fullMatrix(col*factors + row) = triangularMatrix(pos)
 
         pos += 1
         col += 1
       }
 
-      fmd(row*factors + row) = tmd(pos)
+      fullMatrix(row*factors + row) = triangularMatrix(pos)
 
       pos += 1
       row += 1
@@ -893,7 +900,8 @@ object ALS {
   * @param itemFactors Calcualted item matrix
   * @param lambda Regularization value used to calculate the model
   */
-class ALSModel(@transient val userFactors: DataSet[Factors],@transient val itemFactors: DataSet[Factors],
+class ALSModel(@transient val userFactors: DataSet[Factors],
+               @transient val itemFactors: DataSet[Factors],
                val lambda: Double) extends Transformer[(Int, Int), (Int, Int, Double)] with
 Serializable{
 
@@ -905,10 +913,10 @@ Serializable{
       triple => {
         val (((uID, iID), uFactors), iFactors) = triple
 
-        val uFactorsVector = new DoubleMatrix(uFactors.factors)
-        val iFactorsVector = new DoubleMatrix(iFactors.factors)
+        val uFactorsVector = uFactors.factors
+        val iFactorsVector = iFactors.factors
 
-        val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector)
+        val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
 
         (uID, iID, prediction)
       }
@@ -925,13 +933,13 @@ Serializable{
       triple => {
         val (((uID, iID), uFactors), iFactors) = triple
 
-        val uFactorsVector = new DoubleMatrix(uFactors.factors)
-        val iFactorsVector = new DoubleMatrix(iFactors.factors)
+        val uFactorsVector = uFactors.factors
+        val iFactorsVector = iFactors.factors
 
-        val squaredUNorm2 = uFactorsVector.dot(uFactorsVector)
-        val squaredINorm2 = iFactorsVector.dot(iFactorsVector)
+        val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1)
+        val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1)
 
-        val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector)
+        val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
 
         (uID, iID, prediction, squaredUNorm2, squaredINorm2)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
index 523d132..8060d2b 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
@@ -23,11 +23,10 @@ import org.apache.flink.api.scala.DataSet
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml.math.Vector
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.JBlas._
 
 import org.apache.flink.api.scala._
 
-import org.jblas.{SimpleBlas, DoubleMatrix}
+import com.github.fommil.netlib.BLAS.{ getInstance => blas }
 
 /** Multiple linear regression using the ordinary least squares (OLS) estimator.
   *
@@ -77,8 +76,9 @@ import org.jblas.{SimpleBlas, DoubleMatrix}
   *
   *  - [[MultipleLinearRegression.Stepsize]]:
   *  Initial step size for the gradient descent method.
-  *  This value controls how far the gradient descent method moves in the opposite direction of the gradient.
-  *  Tuning this parameter might be crucial to make it stable and to obtain a better performance.
+  *  This value controls how far the gradient descent method moves in the opposite direction of the
+  *  gradient. Tuning this parameter might be crucial to make it stable and to obtain a better
+  *  performance.
   *
   *  - [[MultipleLinearRegression.ConvergenceThreshold]]:
   *  Threshold for relative change of sum of squared residuals until convergence.
@@ -113,7 +113,11 @@ with Serializable {
     val convergenceThreshold = map.get(ConvergenceThreshold)
 
     // calculate dimension of the feature vectors
-    val dimension = input.map{_.vector.size}.reduce { math.max(_, _) }
+    val dimension = input.map{_.vector.size}.reduce {
+      (a, b) =>
+        require(a == b, "All input vector must have the same dimension.")
+        a
+    }
 
     // initial weight vector is set to 0
     val initialWeightVector = createInitialWeightVector(dimension)
@@ -150,7 +154,9 @@ with Serializable {
                   val (leftBetas, leftBeta0, leftCount) = left
                   val (rightBetas, rightBeta0, rightCount) = right
 
-                  (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount)
+                  blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1)
+
+                  (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
             }.map {
               new LinearRegressionWeightsUpdate(stepsize)
             }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
@@ -197,7 +203,8 @@ with Serializable {
                 val (leftBetas, leftBeta0, leftCount) = left
                 val (rightBetas, rightBeta0, rightCount) = right
 
-                (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount)
+                blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
+                (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
             }.map {
               new LinearRegressionWeightsUpdate(stepsize)
             }.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
@@ -216,11 +223,11 @@ with Serializable {
     * @return DataSet of a zero vector of dimension d
     */
   private def createInitialWeightVector(dimensionDS: DataSet[Int]):
-  DataSet[(DoubleMatrix, Double)] = {
+  DataSet[(Array[Double], Double)] = {
     dimensionDS.map {
       dimension =>
         val values = Array.fill(dimension)(0.0)
-        (new DoubleMatrix(dimension, 1, values: _*), 0.0)
+        (values, 0.0)
     }
   }
 }
@@ -261,13 +268,13 @@ object MultipleLinearRegression {
 private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
   import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
 
-  var weightVector: DoubleMatrix = null
+  var weightVector: Array[Double] = null
   var weight0: Double = 0.0
 
   @throws(classOf[Exception])
   override def open(configuration: Configuration): Unit = {
     val list = this.getRuntimeContext.
-      getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+      getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
 
     val weightsPair = list.get(0)
 
@@ -279,7 +286,9 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
     val vector = value.vector
     val label = value.label
 
-    val residual = weightVector.dot(vector) + weight0 - label
+    val dotProduct = blas.ddot(weightVector.length, weightVector, 1, vector, 1)
+
+    val residual = dotProduct + weight0 - label
 
     residual*residual
   }
@@ -294,17 +303,17 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
   * The weight vector is received as a broadcast variable.
   */
 private class LinearRegressionGradientDescent extends
-RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
+RichMapFunction[LabeledVector, (Array[Double], Double, Int)] {
 
   import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
 
-  var weightVector: DoubleMatrix = null
+  var weightVector: Array[Double] = null
   var weight0: Double = 0.0
 
   @throws(classOf[Exception])
   override def open(configuration: Configuration): Unit = {
     val list = this.getRuntimeContext.
-      getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+      getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
 
     val weightsPair = list.get(0)
 
@@ -312,13 +321,19 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
     weight0 = weightsPair._2
   }
 
-  override def map(value: LabeledVector): (DoubleMatrix, Double, Int) = {
+  override def map(value: LabeledVector): (Array[Double], Double, Int) = {
     val x = value.vector
     val label = value.label
 
-    val error = weightVector.dot(x) + weight0 - label
+    val dotProduct = blas.ddot(weightVector.length, weightVector, 1, x, 1)
+
+    val error = dotProduct + weight0 - label
+
+    // reuse vector x
+    val weightsGradient = x
+
+    blas.dscal(weightsGradient.length, 2*error, weightsGradient, 1)
 
-    val weightsGradient = x.mul(2 * error)
     val weight0Gradient = 2 * error
 
     (weightsGradient, weight0Gradient, 1)
@@ -332,17 +347,17 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
   * @param stepsize Initial value of the step size used to update the weight vector
   */
 private class LinearRegressionWeightsUpdate(val stepsize: Double) extends
-RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
+RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] {
 
   import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
 
-  var weights: DoubleMatrix = null
+  var weights: Array[Double] = null
   var weight0: Double = 0.0
 
   @throws(classOf[Exception])
   override def open(configuration: Configuration): Unit = {
     val list = this.getRuntimeContext.
-      getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+      getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
 
     val weightsPair = list.get(0)
 
@@ -350,8 +365,10 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
     weight0 = weightsPair._2
   }
 
-  override def map(value: (DoubleMatrix, Double, Int)): (DoubleMatrix, Double) = {
-    val weightsGradient = value._1.div(value._3)
+  override def map(value: (Array[Double], Double, Int)): (Array[Double], Double) = {
+    val weightsGradient = value._1
+    blas.dscal(weightsGradient.length, 1.0/value._3, weightsGradient, 1)
+
     val weight0Gradient = value._2 / value._3
 
     val iteration = getIterationRuntimeContext.getSuperstepNumber
@@ -360,9 +377,8 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
     // decreasing
     val effectiveStepsize = stepsize/math.sqrt(iteration)
 
-    val newWeights = new DoubleMatrix(weights.rows, weights.columns)
-    newWeights.copy(weights)
-    SimpleBlas.axpy( -effectiveStepsize, weightsGradient, newWeights)
+    val newWeights = weights.clone
+    blas.daxpy(newWeights.length, -effectiveStepsize, weightsGradient, 1, newWeights, 1)
     val newWeight0 = weight0 - effectiveStepsize * weight0Gradient
 
     (newWeights, newWeight0)
@@ -383,7 +399,7 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
   * @param weights DataSet containing the calculated weight vector
   */
 class MultipleLinearRegressionModel private[regression]
-(val weights: DataSet[(DoubleMatrix, Double)]) extends
+(val weights: DataSet[(Array[Double], Double)]) extends
 Transformer[ Vector, LabeledVector ] {
 
   import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
@@ -403,13 +419,14 @@ Transformer[ Vector, LabeledVector ] {
   }
 
   private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] {
-    private var weights: DoubleMatrix = null
+    private var weights: Array[Double] = null
     private var weight0: Double = 0
 
 
     @throws(classOf[Exception])
     override def open(configuration: Configuration): Unit = {
-      val t = getRuntimeContext.getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+      val t = getRuntimeContext
+        .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
 
       val weightsPair = t.get(0)
 
@@ -418,7 +435,9 @@ Transformer[ Vector, LabeledVector ] {
     }
 
     override def map(value: Vector): LabeledVector = {
-      val prediction = weights.dot(value) + weight0
+      val dotProduct = blas.ddot(weights.length, weights, 1, value, 1)
+
+      val prediction = dotProduct + weight0
 
       LabeledVector(value, prediction)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
new file mode 100644
index 0000000..28fdfa6
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class PolynomialBaseITCase extends ShouldMatchers {
+
+  @Test
+  def testMapElementToPolynomialVectorSpace (): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism (2)
+
+    val input = Seq (
+    LabeledVector (DenseVector (1), 1.0),
+    LabeledVector (DenseVector (2), 2.0)
+    )
+
+    val inputDS = env.fromCollection (input)
+
+    val transformer = PolynomialBase ()
+    .setDegree (3)
+
+    val transformedDS = transformer.transform (inputDS)
+
+    val expectedMap = List (
+    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
+    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
+    ) toMap
+
+    val result = transformedDS.collect
+
+    for (entry <- result) {
+    expectedMap.contains (entry.label) should be (true)
+    entry.vector should equal (expectedMap (entry.label) )
+    }
+  }
+
+  @Test
+  def testMapVectorToPolynomialVectorSpace(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism(2)
+
+    val input = Seq(
+      LabeledVector(DenseVector(2, 3), 1.0),
+      LabeledVector(DenseVector(2, 3, 4), 2.0)
+    )
+
+    val expectedMap = List(
+      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
+      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
+        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
+    ) toMap
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialBase()
+      .setDegree(3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+
+  @Test
+  def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism(2)
+
+    val input = Seq(
+      LabeledVector(DenseVector(2, 3), 1.0),
+      LabeledVector(DenseVector(2, 3, 4), 2.0)
+    )
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialBase()
+      .setDegree(0)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect
+
+    val expectedMap = List(
+      (1.0 -> DenseVector()),
+      (2.0 -> DenseVector())
+    ) toMap
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+}
+
+object PolynomialBaseITCase {
+  @BeforeClass
+  def setup(): Unit = {
+    CliFrontendTestUtils.pipeSystemOutToNull()
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
deleted file mode 100644
index 8da822f..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.feature
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.DenseVector
-import org.scalatest.{ShouldMatchers, FlatSpec}
-
-import org.apache.flink.api.scala._
-
-class PolynomialBaseSuite extends FlatSpec with ShouldMatchers {
-  behavior of "A PolynomialBase"
-
-  it should "map an element into a polynomial vector space" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input = Seq(
-      LabeledVector(DenseVector(1), 1.0),
-      LabeledVector(DenseVector(2), 2.0)
-    )
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialBase()
-    .setDegree(3)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val expectedMap = List(
-      (1.0 -> DenseVector(1.0, 1.0, 1.0)),
-      (2.0 -> DenseVector(8.0, 4.0, 2.0))
-    ) toMap
-
-    val result = transformedDS.collect
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-
-  }
-
-  it should "map a vector into a polynomial vector space" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input = Seq(
-      LabeledVector(DenseVector(2, 3), 1.0),
-      LabeledVector(DenseVector(2, 3, 4), 2.0)
-    )
-
-    val expectedMap = List(
-      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
-      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
-      9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
-    ) toMap
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialBase()
-    .setDegree(3)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-
-    println(result)
-  }
-
-  it should "return an empty vector if the polynomial degree is set to 0" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input = Seq(
-      LabeledVector(DenseVector(2, 3), 1.0),
-      LabeledVector(DenseVector(2, 3, 4), 2.0)
-    )
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialBase()
-    .setDegree(0)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect
-
-    val expectedMap = List(
-      (1.0 -> DenseVector()),
-      (2.0 -> DenseVector())
-    ) toMap
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
new file mode 100644
index 0000000..f2c52d3
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class ALSITCase extends ShouldMatchers {
+
+  @Test
+  def testMatrixFactorization(): Unit = {
+    import ALSData._
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism(2)
+
+    val als = ALS()
+      .setIterations(iterations)
+      .setLambda(lambda)
+      .setBlocks(4)
+      .setNumFactors(numFactors)
+
+    val inputDS = env.fromCollection(data)
+
+    val model = als.fit(inputDS)
+
+    val testData = env.fromCollection(expectedResult.map{
+      case (userID, itemID, rating) => (userID, itemID)
+    })
+
+    val predictions = model.transform(testData).collect
+
+    predictions.length should equal(expectedResult.length)
+
+    val resultMap = expectedResult map {
+      case (uID, iID, value) => (uID, iID) -> value
+    } toMap
+
+    predictions foreach {
+      case (uID, iID, value) => {
+        resultMap.isDefinedAt(((uID, iID))) should be(true)
+
+        value should be(resultMap((uID, iID)) +- 0.1)
+      }
+    }
+
+    val risk = model.empiricalRisk(inputDS).collect(0)
+
+    risk should be(expectedEmpiricalRisk +- 1)
+  }
+}
+
+object ALSITCase {
+
+  @BeforeClass
+  def setup(): Unit = {
+    CliFrontendTestUtils.pipeSystemOutToNull()
+  }
+}
+
+object ALSData {
+
+  val iterations = 9
+  val lambda = 1.0
+  val numFactors = 5
+
+  val data: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2,13,534.3937734561154),
+      (6,14,509.63176469621936),
+      (4,14,515.8246770897443),
+      (7,3,495.05234565105),
+      (2,3,532.3281786219485),
+      (5,3,497.1906356844367),
+      (3,3,512.0640508585093),
+      (10,3,500.2906742233019),
+      (1,4,521.9189079662882),
+      (2,4,515.0734651491396),
+      (1,7,522.7532725967008),
+      (8,4,492.65683825096403),
+      (4,8,492.65683825096403),
+      (10,8,507.03319667905413),
+      (7,1,522.7532725967008),
+      (1,1,572.2230209271174),
+      (2,1,563.5849190220224),
+      (6,1,518.4844061038742),
+      (9,1,529.2443732217674),
+      (8,1,543.3202505434103),
+      (7,2,516.0188923307859),
+      (1,2,563.5849190220224),
+      (1,11,515.1023793011227),
+      (8,2,536.8571133978352),
+      (2,11,507.90776961762225),
+      (3,2,532.3281786219485),
+      (5,11,476.24185144363304),
+      (4,2,515.0734651491396),
+      (4,11,469.92049343738233),
+      (3,12,509.4713776280098),
+      (4,12,494.6533165132021),
+      (7,5,482.2907867916308),
+      (6,5,477.5940040923741),
+      (4,5,480.9040684364228),
+      (1,6,518.4844061038742),
+      (6,6,470.6605085832807),
+      (8,6,489.6360564705307),
+      (4,6,472.74052954447046),
+      (7,9,482.5837650471611),
+      (5,9,487.00175463269863),
+      (9,9,500.69514584780944),
+      (4,9,477.71644808419325),
+      (7,10,485.3852917539852),
+      (8,10,507.03319667905413),
+      (3,10,500.2906742233019),
+      (5,15,488.08215944254437),
+      (6,15,480.16929757607346)
+    )
+  }
+
+  val expectedResult: Seq[(Int, Int, Double)] = {
+    Seq(
+      (2, 2, 526.1037),
+      (5, 9, 468.5680),
+      (10, 3, 484.8975),
+      (5, 13, 451.6228),
+      (1, 15, 493.4956),
+      (4, 11, 456.3862)
+    )
+  }
+
+  val expectedEmpiricalRisk = 505374.1877
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
deleted file mode 100644
index 770d4d2..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.recommendation
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.scalatest.{ShouldMatchers, FlatSpec}
-
-import org.apache.flink.api.scala._
-
-class ALSSuite extends FlatSpec with ShouldMatchers {
-
-  behavior of "ALS"
-
-  it should "factorize a given matrix" in {
-    import ALSData._
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val als = ALS()
-    .setIterations(iterations)
-    .setLambda(lambda)
-    .setBlocks(4)
-    .setNumFactors(numFactors)
-
-    val inputDS = env.fromCollection(data)
-
-    val model = als.fit(inputDS)
-
-    val testData = env.fromCollection(expectedResult.map{
-      case (userID, itemID, rating) => (userID, itemID)
-    })
-
-    val predictions = model.transform(testData).collect
-
-    predictions.length should equal(expectedResult.length)
-
-    val resultMap = expectedResult map {
-      case (uID, iID, value) => (uID, iID) -> value
-    } toMap
-
-    predictions foreach {
-      case (uID, iID, value) => {
-        resultMap.isDefinedAt(((uID, iID))) should be(true)
-
-        value should be(resultMap((uID, iID)) +- 0.1)
-      }
-    }
-
-    val risk = model.empiricalRisk(inputDS).collect(0)
-
-    risk should be(expectedEmpiricalRisk +- 1)
-  }
-}
-
-object ALSData {
-
-  val iterations = 9
-  val lambda = 1.0
-  val numFactors = 5
-
-  val data: Seq[(Int, Int, Double)] = {
-    Seq(
-      (2,13,534.3937734561154),
-      (6,14,509.63176469621936),
-      (4,14,515.8246770897443),
-      (7,3,495.05234565105),
-      (2,3,532.3281786219485),
-      (5,3,497.1906356844367),
-      (3,3,512.0640508585093),
-      (10,3,500.2906742233019),
-      (1,4,521.9189079662882),
-      (2,4,515.0734651491396),
-      (1,7,522.7532725967008),
-      (8,4,492.65683825096403),
-      (4,8,492.65683825096403),
-      (10,8,507.03319667905413),
-      (7,1,522.7532725967008),
-      (1,1,572.2230209271174),
-      (2,1,563.5849190220224),
-      (6,1,518.4844061038742),
-      (9,1,529.2443732217674),
-      (8,1,543.3202505434103),
-      (7,2,516.0188923307859),
-      (1,2,563.5849190220224),
-      (1,11,515.1023793011227),
-      (8,2,536.8571133978352),
-      (2,11,507.90776961762225),
-      (3,2,532.3281786219485),
-      (5,11,476.24185144363304),
-      (4,2,515.0734651491396),
-      (4,11,469.92049343738233),
-      (3,12,509.4713776280098),
-      (4,12,494.6533165132021),
-      (7,5,482.2907867916308),
-      (6,5,477.5940040923741),
-      (4,5,480.9040684364228),
-      (1,6,518.4844061038742),
-      (6,6,470.6605085832807),
-      (8,6,489.6360564705307),
-      (4,6,472.74052954447046),
-      (7,9,482.5837650471611),
-      (5,9,487.00175463269863),
-      (9,9,500.69514584780944),
-      (4,9,477.71644808419325),
-      (7,10,485.3852917539852),
-      (8,10,507.03319667905413),
-      (3,10,500.2906742233019),
-      (5,15,488.08215944254437),
-      (6,15,480.16929757607346)
-    )
-  }
-
-  val expectedResult: Seq[(Int, Int, Double)] = {
-    Seq(
-      (2, 2, 526.1037),
-      (5, 9, 468.5680),
-      (10, 3, 484.8975),
-      (5, 13, 451.6228),
-      (1, 15, 493.4956),
-      (4, 11, 456.3862)
-    )
-  }
-
-  val expectedEmpiricalRisk = 505374.1877
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
new file mode 100644
index 0000000..eb825b9
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.apache.flink.ml.common.ParameterMap
+import org.apache.flink.ml.feature.PolynomialBase
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class MultipleLinearRegressionITCase extends ShouldMatchers {
+
+  @Test
+  def testEstimationOfLinearFunction(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism(2)
+
+    val learner = MultipleLinearRegression()
+
+    import RegressionData._
+
+    val parameters = ParameterMap()
+
+    parameters.add(MultipleLinearRegression.Stepsize, 1.0)
+    parameters.add(MultipleLinearRegression.Iterations, 10)
+    parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+    val inputDS = env.fromCollection(data)
+    val model = learner.fit(inputDS, parameters)
+
+    val weightList = model.weights.collect
+
+    weightList.size should equal(1)
+
+    val (weights, weight0) = weightList(0)
+
+    expectedWeights zip weights foreach {
+      case (expectedWeight, weight) =>
+        weight should be (expectedWeight +- 1)
+    }
+    weight0 should be (expectedWeight0 +- 0.4)
+
+    val srs = model.squaredResidualSum(inputDS).collect(0)
+
+    srs should be (expectedSquaredResidualSum +- 2)
+  }
+
+  @Test
+  def testEstimationOfCubicFunction(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setDegreeOfParallelism(2)
+
+    val polynomialBase = PolynomialBase()
+    val learner = MultipleLinearRegression()
+
+    val pipeline = polynomialBase.chain(learner)
+
+    val inputDS = env.fromCollection(RegressionData.polynomialData)
+
+    val parameters = ParameterMap()
+      .add(PolynomialBase.Degree, 3)
+      .add(MultipleLinearRegression.Stepsize, 0.002)
+      .add(MultipleLinearRegression.Iterations, 100)
+
+    val model = pipeline.fit(inputDS, parameters)
+
+    val weightList = model.weights.collect
+
+    weightList.size should equal(1)
+
+    val (weights, weight0) = weightList(0)
+
+    RegressionData.expectedPolynomialWeights.zip(weights) foreach {
+      case (expectedWeight, weight) =>
+        weight should be(expectedWeight +- 0.1)
+    }
+
+    weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1)
+
+    val transformedInput = polynomialBase.transform(inputDS, parameters)
+
+    val srs = model.squaredResidualSum(transformedInput).collect(0)
+
+    srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
+  }
+}
+
+object MultipleLinearRegressionITCase{
+
+  @BeforeClass
+  def setup(): Unit = {
+    CliFrontendTestUtils.pipeSystemOutToNull()
+  }
+}