You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/03/17 23:45:26 UTC
[7/8] flink git commit: [FLINK-1697] [ml] Adds web documentation for
alternating least squares. Adds web documentation for polynomial base feature
mapper.
[FLINK-1697] [ml] Adds web documentation for alternating least squares. Adds web documentation for polynomial base feature mapper.
[ml] Adds comments
[ml] Set degree of parallelism of test suites to 2
[ml] Replaces FlatSpec tests with JUnit integration test cases in order to suppress the sysout output.
[ml] Adds missing clients-test jar
[docs] Sets jekyll's baseurl to http://ci.apache.org/projects/flink/flink-docs-master
[ml] Replaces JBlas by java netlib to avoid license issues of included fortran libraries
[ml] Adds com.github.fommil.netlib:core to license file
[ml] Adds Scala docs to FlinkTools
[ml] Adds comments to LabeledVector and the math package object
This closes #479.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21e2d96f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21e2d96f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21e2d96f
Branch: refs/heads/master
Commit: 21e2d96f893e4460a8d85c501e31dc09ed2f0043
Parents: ff83c8c
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Mar 10 15:41:40 2015 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Mar 17 23:28:34 2015 +0100
----------------------------------------------------------------------
LICENSE | 1 -
docs/_config.yml | 4 +-
docs/build_docs.sh | 2 +-
docs/ml/alternating_least_squares.md | 131 +++++++++++++++-
docs/ml/multiple_linear_regression.md | 57 ++++---
docs/ml/polynomial_base_feature_extractor.md | 28 ----
docs/ml/polynomial_base_feature_mapper.md | 91 +++++++++++
flink-clients/pom.xml | 11 ++
flink-dist/src/main/flink-bin/LICENSE | 1 +
.../apache/flink/runtime/client/JobClient.scala | 2 +-
flink-staging/flink-ml/pom.xml | 14 +-
.../apache/flink/ml/common/ChainedLearner.scala | 13 ++
.../flink/ml/common/ChainedTransformer.scala | 12 ++
.../org/apache/flink/ml/common/FlinkTools.scala | 88 +++++++++--
.../apache/flink/ml/common/LabeledVector.scala | 6 +
.../org/apache/flink/ml/common/Learner.scala | 14 +-
.../apache/flink/ml/common/Transformer.scala | 24 ++-
.../flink/ml/feature/PolynomialBase.scala | 4 +-
.../org/apache/flink/ml/math/DenseVector.scala | 9 ++
.../scala/org/apache/flink/ml/math/JBlas.scala | 70 ---------
.../scala/org/apache/flink/ml/math/Vector.scala | 7 +
.../org/apache/flink/ml/math/package.scala | 9 +-
.../apache/flink/ml/recommendation/ALS.scala | 98 ++++++------
.../regression/MultipleLinearRegression.scala | 79 ++++++----
.../flink/ml/feature/PolynomialBaseITCase.scala | 132 ++++++++++++++++
.../flink/ml/feature/PolynomialBaseSuite.scala | 118 --------------
.../flink/ml/recommendation/ALSITCase.scala | 152 +++++++++++++++++++
.../flink/ml/recommendation/ALSSuite.scala | 141 -----------------
.../MultipleLinearRegressionITCase.scala | 115 ++++++++++++++
.../MultipleLinearRegressionSuite.scala | 100 ------------
.../flink/ml/regression/RegressionData.scala | 3 +-
31 files changed, 956 insertions(+), 580 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/LICENSE
----------------------------------------------------------------------
diff --git a/LICENSE b/LICENSE
index e58d1a5..85d0d85 100644
--- a/LICENSE
+++ b/LICENSE
@@ -228,7 +228,6 @@ The Apache Flink project bundles the following files under the MIT License:
- normalize.css v3.0.0 (http://git.io/normalize) - Copyright (c) Nicolas Gallagher and Jonathan Neal
- Font Awesome - Code (http://fortawesome.github.io/Font-Awesome/) - Copyright (c) 2014 Dave Gandy
- D3 dagre renderer (https://github.com/cpettitt/dagre-d3) - Copyright (c) 2012-2013 Chris Pettitt
- - scopt (http://github.com/scopt/scopt)
All rights reserved.
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/_config.yml
----------------------------------------------------------------------
diff --git a/docs/_config.yml b/docs/_config.yml
index 612aa6f..d7cf349 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -30,7 +30,7 @@ FLINK_SCALA_VERSION_SHORT: "2.10"
FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
FLINK_GITHUB_URL: https://github.com/apache/flink
-FLINK_WEBSITE_URL: http://flink.apache.org/
+FLINK_WEBSITE_URL: http://flink.apache.org
FLINK_DOWNLOAD_URL: http://flink.apache.org/downloads.html
FLINK_DOWNLOAD_URL_HADOOP1_STABLE: http://www.apache.org/dyn/closer.cgi/flink/flink-0.8.1/flink-0.8.1-bin-hadoop1.tgz
@@ -59,3 +59,5 @@ kramdown:
toc_levels: 1..3
host: localhost
+
+baseurl: http://ci.apache.org/projects/flink/flink-docs-master
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/build_docs.sh
----------------------------------------------------------------------
diff --git a/docs/build_docs.sh b/docs/build_docs.sh
index 4f8a7c9..b65f7c9 100755
--- a/docs/build_docs.sh
+++ b/docs/build_docs.sh
@@ -54,7 +54,7 @@ JEKYLL_CMD="build"
while getopts ":p" opt; do
case $opt in
p)
- JEKYLL_CMD="serve --watch"
+ JEKYLL_CMD="serve --baseurl "" --watch"
;;
esac
done
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/alternating_least_squares.md
----------------------------------------------------------------------
diff --git a/docs/ml/alternating_least_squares.md b/docs/ml/alternating_least_squares.md
index bf97b1b..7a4a5d5 100644
--- a/docs/ml/alternating_least_squares.md
+++ b/docs/ml/alternating_least_squares.md
@@ -1,4 +1,5 @@
---
+mathjax: include
title: Alternating Least Squares
---
<!--
@@ -25,4 +26,132 @@ under the License.
## Description
-## Parameters
\ No newline at end of file
+The alternating least squares (ALS) algorithm factorizes a given matrix $R$ into two factors $U$ and $V$ such that $R \approx U^TV$.
+The unknown row dimension is given as a parameter to the algorithm and is called latent factors.
+Since matrix factorization can be used in the context of recommendation, the matrices $U$ and $V$ can be called user and item matrix, respectively.
+The $i$th column of the user matrix is denoted by $u_i$ and the $i$th column of the item matrix is $v_i$.
+The matrix $R$ can be called the ratings matrix with $$(R)_{i,j} = r_{i,j}$$.
+
+In order to find the user and item matrix, the following problem is solved:
+
+$$\arg\min_{U,V} \sum_{\{i,j\mid r_{i,j} \not= 0\}} \left(r_{i,j} - u_{i}^Tv_{j}\right)^2 +
+\lambda \left(\sum_{i} n_{u_i} \left\lVert u_i \right\rVert^2 + \sum_{j} n_{v_j} \left\lVert v_j \right\rVert^2 \right)$$
+
+with $\lambda$ being the regularization factor, $$n_{u_i}$$ being the number of items the user $i$ has rated and $$n_{v_j}$$ being the number of times the item $j$ has been rated.
+This regularization scheme to avoid overfitting is called weighted-$\lambda$-regularization.
+Details can be found in the work of [Zhou et al.](http://dx.doi.org/10.1007/978-3-540-68880-8_32).
+
+By fixing one of the matrices $U$ or $V$, we obtain a quadratic form which can be solved directly.
+The solution of the modified problem is guaranteed to monotonically decrease the overall cost function.
+By applying this step alternately to the matrices $U$ and $V$, we can iteratively improve the matrix factorization.
+
+The matrix $R$ is given in its sparse representation as a tuple of $(i, j, r)$ where $i$ denotes the row index, $j$ the column index and $r$ is the matrix value at position $(i,j)$.
+
+
+## Parameters
+
+The alternating least squares implementation can be controlled by the following parameters:
+
+ <table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Parameters</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>NumFactors</strong></td>
+ <td>
+ <p>
+ The number of latent factors to use for the underlying model.
+ It is equivalent to the dimension of the calculated user and item vectors.
+ (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Lambda</strong></td>
+ <td>
+ <p>
+ Regularization factor. Tune this value in order to avoid overfitting or poor performance due to strong generalization.
+ (Default value: <strong>1</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Iterations</strong></td>
+ <td>
+ <p>
+ The maximum number of iterations.
+ (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Blocks</strong></td>
+ <td>
+ <p>
+ The number of blocks into which the user and item matrix are grouped.
+ The fewer blocks one uses, the less data is sent redundantly.
+ However, bigger blocks entail bigger update messages which have to be stored on the heap.
+ If the algorithm fails because of an OutOfMemoryException, then try to increase the number of blocks.
+ (Default value: '''None''')
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Seed</strong></td>
+ <td>
+ <p>
+ Random seed used to generate the initial item matrix for the algorithm.
+ (Default value: <strong>0</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>TemporaryPath</strong></td>
+ <td>
+ <p>
+ Path to a temporary directory into which intermediate results are stored.
+ If this value is set, then the algorithm is split into two preprocessing steps, the ALS iteration and a post-processing step which calculates a last ALS half-step.
+ The preprocessing steps calculate the <code>OutBlockInformation</code> and <code>InBlockInformation</code> for the given rating matrix.
+ The results of the individual steps are stored in the specified directory.
+ By splitting the algorithm into multiple smaller steps, Flink does not have to split the available memory amongst too many operators.
+ This allows the system to process bigger individual messages and improves the overall performance.
+ (Default value: <strong>None</strong>)
+ </p>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+
+## Examples
+
+{% highlight scala %}
+// Read input data set from a csv file
+val inputDS: DataSet[(Int, Int, Double)] = env.readCsvFile[(Int, Int, Double)](
+ pathToTrainingFile)
+
+// Setup the ALS learner
+val als = ALS()
+.setIterations(10)
+.setNumFactors(10)
+.setBlocks(100)
+.setTemporaryPath("hdfs://tempPath")
+
+// Set the other parameters via a parameter map
+val parameters = ParameterMap()
+.add(ALS.Lambda, 0.9)
+.add(ALS.Seed, 42)
+
+// Calculate the factorization
+val factorization = als.fit(inputDS, parameters)
+
+// Read the testing data set from a csv file
+val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
+
+// Calculate the ratings according to the matrix factorization
+val predictedRatings = factorization.transform(testingDS)
+{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/multiple_linear_regression.md
----------------------------------------------------------------------
diff --git a/docs/ml/multiple_linear_regression.md b/docs/ml/multiple_linear_regression.md
index e98ccc0..840e899 100644
--- a/docs/ml/multiple_linear_regression.md
+++ b/docs/ml/multiple_linear_regression.md
@@ -62,21 +62,46 @@ under the License.
## Parameters
The multiple linear regression implementation can be controlled by the following parameters:
-
-Iterations
-: The maximum number of iterations.
-(Default value: **10**)
-
-Stepsize
-: Initial step size for the gradient descent method.
-This value controls how far the gradient descent method moves in the opposite direction of the gradient.
-Tuning this parameter might be crucial to make it stable and to obtain a better performance.
-(Default value: **0.1**)
-
-ConvergenceThreshold
-: Threshold for relative change of the sum of squared residuals until the iteration is stopped.
-(Default value: **None**)
-
+
+ <table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Parameters</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Iterations</strong></td>
+ <td>
+ <p>
+ The maximum number of iterations. (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Stepsize</strong></td>
+ <td>
+ <p>
+ Initial step size for the gradient descent method.
+ This value controls how far the gradient descent method moves in the opposite direction of the gradient.
+ Tuning this parameter might be crucial to make it stable and to obtain a better performance.
+ (Default value: <strong>0.1</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>ConvergenceThreshold</strong></td>
+ <td>
+ <p>
+ Threshold for relative change of the sum of squared residuals until the iteration is stopped.
+ (Default value: <strong>None</strong>)
+ </p>
+ </td>
+ </tr>
+ </tbody>
+ </table>
## Examples
@@ -97,5 +122,3 @@ val model = mlr.fit(trainingDS)
// Calculate the predictions for the test data
val predictions = model.transform(testingDS)
{% endhighlight %}
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/polynomial_base_feature_extractor.md
----------------------------------------------------------------------
diff --git a/docs/ml/polynomial_base_feature_extractor.md b/docs/ml/polynomial_base_feature_extractor.md
deleted file mode 100644
index ececec3..0000000
--- a/docs/ml/polynomial_base_feature_extractor.md
+++ /dev/null
@@ -1,28 +0,0 @@
----
-title: Polynomial Base Feature Extractor
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-## Description
-
-## Parameters
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/docs/ml/polynomial_base_feature_mapper.md
----------------------------------------------------------------------
diff --git a/docs/ml/polynomial_base_feature_mapper.md b/docs/ml/polynomial_base_feature_mapper.md
new file mode 100644
index 0000000..2964f04
--- /dev/null
+++ b/docs/ml/polynomial_base_feature_mapper.md
@@ -0,0 +1,91 @@
+---
+mathjax: include
+title: Polynomial Base Feature Mapper
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+The polynomial base feature mapper maps a vector into the polynomial feature space of degree $d$.
+The dimension of the input vector determines the number of polynomial factors whose values are the respective vector entries.
+Given a vector $(x, y, z, \ldots)^T$ the resulting feature vector looks like:
+
+$$\left(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xy^2, xyz, xz^2, y^3, \ldots\right)^T$$
+
+Flink's implementation orders the polynomials in decreasing order of their degree.
+
+Given the vector $\left(3,2\right)^T$, the polynomial base feature vector of degree 3 would look like
+
+ $$\left(3^3, 3^2\cdot2, 3\cdot2^2, 2^3, 3^2, 3\cdot2, 2^2, 3, 2\right)^T$$
+
+This transformer can be prepended to all `Transformer` and `Learner` implementations which expect an input of type `LabeledVector`.
+
+## Parameters
+
+The polynomial base feature mapper can be controlled by the following parameters:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Parameters</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Degree</strong></td>
+ <td>
+ <p>
+ The maximum polynomial degree.
+ (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+
+## Examples
+
+{% highlight scala %}
+// Obtain the training data set
+val trainingDS: DataSet[LabeledVector] = ...
+
+// Setup polynomial base feature extractor of degree 3
+val polyBase = PolynomialBase()
+.setDegree(3)
+
+// Setup the multiple linear regression learner
+val mlr = MultipleLinearRegression()
+
+// Control the learner via the parameter map
+val parameters = ParameterMap()
+.add(MultipleLinearRegression.Iterations, 20)
+.add(MultipleLinearRegression.Stepsize, 0.5)
+
+// Create pipeline PolynomialBase -> MultipleLinearRegression
+val chained = polyBase.chain(mlr)
+
+// Learn the model
+val model = chained.fit(trainingDS)
+{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index d7dccad..95d17d7 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -116,6 +116,17 @@ under the License.
<build>
<plugins>
<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-dist/src/main/flink-bin/LICENSE
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/LICENSE b/flink-dist/src/main/flink-bin/LICENSE
index 89d8eca..d0b7fb4 100644
--- a/flink-dist/src/main/flink-bin/LICENSE
+++ b/flink-dist/src/main/flink-bin/LICENSE
@@ -302,6 +302,7 @@ The Apache Flink project bundles the following components under
BSD-style licenses:
[3-clause BSD license]
+ - core (com.github.fommil.netlib:core:1.1.2 - https://github.com/fommil/netlib-java/core)
- Kryo (https://github.com/EsotericSoftware/kryo) - Copyright (c) 2008, Nathan Sweet
- D3 (http://d3js.org/) - Copyright (c) 2010-2014, Michael Bostock
- LevelDB JNI (https://github.com/fusesource/leveldbjni/) - Copyright (c) 2011, FuseSource Corp.
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
index 19b3050..f1c6450 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/client/JobClient.scala
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.jobgraph.{JobID, JobGraph}
+import org.apache.flink.runtime.jobgraph.JobGraph
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.messages.JobClientMessages.{SubmitJobDetached, SubmitJobAndWait}
import org.apache.flink.runtime.messages.JobManagerMessages._
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml
index 53188e1..24ba591 100644
--- a/flink-staging/flink-ml/pom.xml
+++ b/flink-staging/flink-ml/pom.xml
@@ -46,9 +46,9 @@
</dependency>
<dependency>
- <groupId>org.jblas</groupId>
- <artifactId>jblas</artifactId>
- <version>1.2.3</version>
+ <groupId>com.github.fommil.netlib</groupId>
+ <artifactId>core</artifactId>
+ <version>1.1.2</version>
</dependency>
<dependency>
@@ -57,6 +57,14 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
index cd0f403..b1a0a2f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
@@ -20,6 +20,19 @@ package org.apache.flink.ml.common
import org.apache.flink.api.scala.DataSet
+/** This class represents a [[org.apache.flink.ml.common.Learner]] which is chained to a
+ * [[Transformer]].
+ *
+ * Calling the method `fit` on this object will pipe the input data through the given
+ * [[Transformer]], whose output is fed to the [[Learner]].
+ *
+ * @param head Preceding [[Transformer]] pipeline
+ * @param tail [[Learner]] instance
+ * @tparam IN Type of the training data
+ * @tparam TEMP Type of the produced data by the transformer pipeline and input type to the
+ * [[Learner]]
+ * @tparam OUT Type of the trained model
+ */
class ChainedLearner[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
val tail: Learner[TEMP, OUT])
extends Learner[IN, OUT] {
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
index 9a262cb..3f108bf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
@@ -20,6 +20,18 @@ package org.apache.flink.ml.common
import org.apache.flink.api.scala.DataSet
+/** This class represents a chain of multiple [[Transformer]].
+ *
+ * Calling the method `transform` on this object will first apply the preceding [[Transformer]] to
+ * the input data. The resulting output data is then fed to the succeeding [[Transformer]].
+ *
+ * @param head Preceding [[Transformer]]
+ * @param tail Succeeding [[Transformer]]
+ * @tparam IN Type of incoming elements
+ * @tparam TEMP Type of output elements of the preceding [[Transformer]] and input type of
+ * succeeding [[Transformer]]
+ * @tparam OUT Type of outgoing elements
+ */
class ChainedTransformer[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
val tail: Transformer[TEMP, OUT])
extends Transformer[IN, OUT] {
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
index 2b12f30..22bbe82 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkTools.scala
@@ -27,18 +27,24 @@ import org.apache.flink.core.fs.Path
import scala.reflect.ClassTag
-/**
- * Collection of convenience functions
- */
+/** FlinkTools contains a set of convenience functions for Flink's machine learning library:
+ *
+ * - persist:
+ * Takes up to 5 [[DataSet]]s and file paths. Each [[DataSet]] is written to the specified
+ * path and subsequently re-read from disk. This method can be used to effectively split the
+ * execution graph at the given [[DataSet]]. Writing it to disk triggers its materialization
+ * and specifying it as a source will prevent the re-execution of it.
+ */
object FlinkTools {
- /**
- *
- * @param dataset
- * @param path
- * @tparam T
- * @return
- */
+ /** Writes a [[DataSet]] to the specified path and returns it as a DataSource for subsequent
+ * operations.
+ *
+ * @param dataset [[DataSet]] to write to disk
+ * @param path File path to write dataset to
+ * @tparam T Type of the [[DataSet]] elements
+ * @return [[DataSet]] reading the just written file
+ */
def persist[T: ClassTag: TypeInformation](dataset: DataSet[T], path: String): DataSet[T] = {
val env = dataset.getExecutionEnvironment
val outputFormat = new TypeSerializerOutputFormat[T]
@@ -57,6 +63,17 @@ object FlinkTools {
env.createInput(inputFormat)
}
+ /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+ * subsequent operations.
+ *
+ * @param ds1 First [[DataSet]] to write to disk
+ * @param ds2 Second [[DataSet]] to write to disk
+ * @param path1 Path for ds1
+ * @param path2 Path for ds2
+ * @tparam A Type of the first [[DataSet]]'s elements
+ * @tparam B Type of the second [[DataSet]]'s elements
+ * @return Tuple of [[DataSet]]s reading the just written files
+ */
def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation](ds1: DataSet[A], ds2:
DataSet[B], path1: String, path2: String):(DataSet[A], DataSet[B]) = {
val env = ds1.getExecutionEnvironment
@@ -88,6 +105,20 @@ object FlinkTools {
(env.createInput(if1), env.createInput(if2))
}
+ /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+ * subsequent operations.
+ *
+ * @param ds1 First [[DataSet]] to write to disk
+ * @param ds2 Second [[DataSet]] to write to disk
+ * @param ds3 Third [[DataSet]] to write to disk
+ * @param path1 Path for ds1
+ * @param path2 Path for ds2
+ * @param path3 Path for ds3
+ * @tparam A Type of first [[DataSet]]'s elements
+ * @tparam B Type of second [[DataSet]]'s elements
+ * @tparam C Type of third [[DataSet]]'s elements
+ * @return Tuple of [[DataSet]]s reading the just written files
+ */
def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
C: ClassTag: TypeInformation](ds1: DataSet[A], ds2: DataSet[B], ds3: DataSet[C], path1:
String, path2: String, path3: String): (DataSet[A], DataSet[B], DataSet[C]) = {
@@ -131,6 +162,23 @@ object FlinkTools {
(env.createInput(if1), env.createInput(if2), env.createInput(if3))
}
+ /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+ * subsequent operations.
+ *
+ * @param ds1 First [[DataSet]] to write to disk
+ * @param ds2 Second [[DataSet]] to write to disk
+ * @param ds3 Third [[DataSet]] to write to disk
+ * @param ds4 Fourth [[DataSet]] to write to disk
+ * @param path1 Path for ds1
+ * @param path2 Path for ds2
+ * @param path3 Path for ds3
+ * @param path4 Path for ds4
+ * @tparam A Type of first [[DataSet]]'s elements
+ * @tparam B Type of second [[DataSet]]'s elements
+ * @tparam C Type of third [[DataSet]]'s elements
+ * @tparam D Type of fourth [[DataSet]]'s elements
+ * @return Tuple of [[DataSet]]s reading the just written files
+ */
def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
C: ClassTag: TypeInformation, D: ClassTag: TypeInformation](ds1: DataSet[A], ds2: DataSet[B],
ds3: DataSet[C], ds4: DataSet[D],
@@ -188,6 +236,26 @@ object FlinkTools {
(env.createInput(if1), env.createInput(if2), env.createInput(if3), env.createInput(if4))
}
+ /** Writes multiple [[DataSet]]s to the specified paths and returns them as DataSources for
+ * subsequent operations.
+ *
+ * @param ds1 First [[DataSet]] to write to disk
+ * @param ds2 Second [[DataSet]] to write to disk
+ * @param ds3 Third [[DataSet]] to write to disk
+ * @param ds4 Fourth [[DataSet]] to write to disk
+ * @param ds5 Fifth [[DataSet]] to write to disk
+ * @param path1 Path for ds1
+ * @param path2 Path for ds2
+ * @param path3 Path for ds3
+ * @param path4 Path for ds4
+ * @param path5 Path for ds5
+ * @tparam A Type of first [[DataSet]]'s elements
+ * @tparam B Type of second [[DataSet]]'s elements
+ * @tparam C Type of third [[DataSet]]'s elements
+ * @tparam D Type of fourth [[DataSet]]'s elements
+ * @tparam E Type of fifth [[DataSet]]'s elements
+ * @return Tuple of [[DataSet]]s reading the just written files
+ */
def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation,
C: ClassTag: TypeInformation, D: ClassTag: TypeInformation, E: ClassTag: TypeInformation]
(ds1: DataSet[A], ds2: DataSet[B], ds3: DataSet[C], ds4: DataSet[D], ds5: DataSet[E], path1:
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
index 3c4a257..f3d6172 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/LabeledVector.scala
@@ -20,4 +20,10 @@ package org.apache.flink.ml.common
import org.apache.flink.ml.math.Vector
+/** This class represents a vector with an associated label as it is required for many supervised
+ * learning tasks.
+ *
+ * @param vector Data point
+ * @param label Label of the data point
+ */
case class LabeledVector(vector: Vector, label: Double) {}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
index 0d56dc8..c8082c7 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
@@ -19,8 +19,20 @@
package org.apache.flink.ml.common
import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.WithParameters
+/** Base trait for an algorithm which trains a model based on some training data
+ *
+ * The idea is that all algorithms which train a model implement this trait. That way
+ * they can be chained with [[Transformer]] which act as a preprocessing step for the actual
+ * learning. In that sense, [[Learner]] denote the end of a pipeline and cannot be further
+ * chained.
+ *
+ * Every learner has to implement the `fit` method which takes the training data and learns
+ * a model from the data.
+ *
+ * @tparam IN Type of the training data
+ * @tparam OUT Type of the trained model
+ */
trait Learner[IN, OUT] extends WithParameters {
def fit(input: DataSet[IN], parameters: ParameterMap = ParameterMap.Empty): OUT
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
index 76abc62..02d63cf 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
@@ -20,14 +20,24 @@ package org.apache.flink.ml.common
import org.apache.flink.api.scala.DataSet
-/**
- * A transformer represents
- *
- * @tparam IN Type of incoming elements
- * @tparam OUT Type of outgoing elements
- */
+/** Base trait for an algorithm which transforms the input data to some output data.
+ *
+ * A [[Transformer]] is used to transform input data to some output data. Transformations might
+ * be feature extractors, feature mappings, whitening or centralization just to name a few.
+ *
+ * [[Transformer]] can be chained with other [[Transformer]] creating a [[ChainedTransformer]],
+ * which again can be chained. Chaining a [[Transformer]] with a [[Learner]] creates a
+ * [[ChainedLearner]] which terminates a pipeline.
+ *
+ * A [[Transformer]] implementation has to implement the method `transform`, which defines how
+ * the input data is transformed into the output data.
+ *
+ * @tparam IN Type of incoming elements
+ * @tparam OUT Type of outgoing elements
+ */
trait Transformer[IN, OUT] extends WithParameters {
- def chain[CHAINED](transformer: Transformer[OUT, CHAINED]): ChainedTransformer[IN, OUT, CHAINED] = {
+ def chain[CHAINED](transformer: Transformer[OUT, CHAINED]):
+ ChainedTransformer[IN, OUT, CHAINED] = {
new ChainedTransformer[IN, OUT, CHAINED](this, transformer)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
index 632ded6..04f698e 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/feature/PolynomialBase.scala
@@ -28,13 +28,13 @@ import org.apache.flink.api.scala._
/** Maps a vector into the polynomial feature space.
*
* This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the
- * polynomial feature space of degree `n`. That is to say, it calculates the following
+ * polynomial feature space of degree `d`. That is to say, it calculates the following
* representation:
*
* `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
*
* This transformer can be prepended to all [[Transformer]] and
- * [[org.apache.flink.ml.commonLearner]] implementations which expect an input of
+ * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of
* [[LabeledVector]].
*
* @example
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
index 8e0eed0..d407a70 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
@@ -60,6 +60,15 @@ case class DenseVector(val values: Array[Double]) extends Vector {
case _ => false
}
}
+
+ /**
+ * Copies the vector instance
+ *
+ * @return Copy of the vector instance
+ */
+ override def copy: Vector = {
+ DenseVector(values.clone())
+ }
}
object DenseVector {
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
deleted file mode 100644
index 5d6eca4..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/JBlas.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import org.jblas.DoubleMatrix
-
-/**
- * Convenience functions for the interaction with JBlas. If you want to use JBlas and allow an
- * easy transition from Flink's matrix abstraction to JBlas's and vice versa, simply import
- * all elements contained in the JBlas object.
- */
-object JBlas {
-
- /**
- * Implicit conversion from Flink's [[DenseMatrix]] to JBlas's [[DoubleMatrix]]
- *
- * @param matrix DenseMatrix to be converted
- * @return DoubleMatrix resulting from the given matrix
- */
- implicit def denseMatrix2JBlas(matrix: DenseMatrix): DoubleMatrix = {
- new DoubleMatrix(matrix.numRows, matrix.numCols, matrix.values: _*)
- }
-
- /**
- * Implicit class to extends [[DoubleMatrix]] such that Flink's [[DenseMatrix]] and
- * [[DenseVector]] can easily be retrieved from.
- * @param matrix
- */
- implicit class RichDoubleMatrix(matrix: DoubleMatrix) {
- def fromJBlas: DenseMatrix = DenseMatrix(matrix.rows, matrix.columns, matrix.data)
-
- def fromJBlas2Vector: DenseVector = {
- require(matrix.columns == 1, "The JBlas matrix contains more than 1 column.")
-
- DenseVector(matrix.data)
- }
- }
-
- /**
- * Implicit conversion from Flink's [[Vector]] to JBlas's [[DoubleMatrix]]
- *
- * @param vector Vector to be converted
- * @return DoubleMatrix resulting from the given vector
- */
- implicit def vector2JBlas(vector: Vector): DoubleMatrix = {
- vector match {
- case x: DenseVector => denseVector2JBlas(x)
- }
- }
-
- private def denseVector2JBlas(vector: DenseVector): DoubleMatrix = {
- new DoubleMatrix(vector.size, 1, vector.values: _*)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
index ddda003..20d820c 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
@@ -36,4 +36,11 @@ trait Vector {
* @return element with index
*/
def apply(index: Int): Double
+
+ /**
+ * Copies the vector instance
+ *
+ * @return Copy of the vector instance
+ */
+ def copy: Vector
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
index fce008a..e82e38f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
@@ -19,7 +19,8 @@
package org.apache.flink.ml
/**
- * Convenience to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]] abstraction.
+ * Convenience methods to handle Flink's [[org.apache.flink.ml.math.Matrix]] and [[Vector]]
+ * abstraction.
*/
package object math {
implicit class RichMatrix(matrix: Matrix) extends Iterable[Double] {
@@ -38,4 +39,10 @@ package object math {
}
}
}
+
+ implicit def vector2Array(vector: Vector): Array[Double] = {
+ vector match {
+ case dense: DenseVector => dense.values
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 1051ae5..5ff59d1 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -13,11 +13,12 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.flink.ml.recommendation
-import java.lang
+import java.{util, lang}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.operators.Order
@@ -28,7 +29,9 @@ import org.apache.flink.types.Value
import org.apache.flink.util.Collector
import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction}
-import org.jblas.{Solve, SimpleBlas, DoubleMatrix}
+import com.github.fommil.netlib.BLAS.{ getInstance => blas }
+import com.github.fommil.netlib.LAPACK.{ getInstance => lapack }
+import org.netlib.util.intW
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -43,21 +46,23 @@ import scala.util.Random
* column of the item matrix is `v_i`. The matrix `R` is called the ratings matrix and
* `(R)_{i,j} = r_{i,j}`.
*
- * In order to find the user and item matrix the following problem is solved:
+ * In order to find the user and item matrix, the following problem is solved:
*
* `argmin_{U,V} sum_(i,j\ with\ r_{i,j} != 0) (r_{i,j} - u_{i}^Tv_{j})^2 +
* \lambda (sum_(i) n_{u_i} ||u_i||^2 + sum_(j) n_{v_j} ||v_j||^2)`
*
- * Overfitting is avoided by using a weighted-lambda-regularization scheme.
+ * with `\lambda` being the regularization factor, `n_{u_i}` being the number of items the user `i`
+ * has rated and `n_{v_j}` being the number of times the item `j` has been rated. This
+ * regularization scheme to avoid overfitting is called weighted-lambda-regularization. Details
+ * can be found in the work of [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]].
*
* By fixing one of the matrices `U` or `V` one obtains a quadratic form which can be solved. The
* solution of the modified problem is guaranteed to decrease the overall cost function. By
* applying this step alternately to the matrices `U` and `V`, we can iteratively improve the
- * overall solution. Details can be found in the work of
- * [[http://dx.doi.org/10.1007/978-3-540-68880-8_32 Zhou et al.]].
+ * matrix factorization.
*
* The matrix `R` is given in its sparse representation as a tuple of `(i, j, r)` where `i` is the
- * row index, `j` is the column index and `r` is the matrix a position `(i,j)`.
+ * row index, `j` is the column index and `r` is the matrix value at position `(i,j)`.
*
* @example
* {{{
@@ -68,7 +73,7 @@ import scala.util.Random
* .setIterations(10)
* .setNumFactors(10)
*
- * val model = als.fit(inputDS))
+ * val model = als.fit(inputDS)
*
* val data2Predict: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
*
@@ -79,20 +84,23 @@ import scala.util.Random
*
* - [[ALS.NumFactors]]:
* The number of latent factors. It is the dimension of the calculated user and item vectors.
+ * (Default value: '''10''')
*
* - [[ALS.Lambda]]:
* Regularization factor. Tune this value in order to avoid overfitting/generalization.
+ * (Default value: '''1''')
*
- * - [[ALS.Iterations]]: The number of iterations to perform.
+ * - [[ALS.Iterations]]: The number of iterations to perform. (Default value: '''10''')
*
* - [[ALS.Blocks]]:
* The number of blocks into which the user and item matrix a grouped. The fewer
* blocks one uses, the less data is sent redundantly. However, bigger blocks entail bigger
* update messages which have to be stored on the Heap. If the algorithm fails because of
- * an OutOfMemoryException, then try to increase the number of blocks.
+ * an OutOfMemoryException, then try to increase the number of blocks. (Default value: '''None''')
*
* - [[ALS.Seed]]:
- * Random seed used to generate the initial item matrix for the algorithm
+ * Random seed used to generate the initial item matrix for the algorithm.
+ * (Default value: '''0''')
*
* - [[ALS.TemporaryPath]]:
* Path to a temporary directory into which intermediate results are stored. If
@@ -103,7 +111,7 @@ import scala.util.Random
* the individual steps are stored in the specified directory. By splitting the algorithm
* into multiple smaller steps, Flink does not have to split the available memory amongst too many
* operators. This allows the system to process bigger individual messasges and improves the
- * overall performance.
+ * overall performance. (Default value: '''None''')
*
* The ALS implementation is based on Spark's MLLib implementation of ALS which you can find
* [[https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/
@@ -318,10 +326,10 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
// in order to save space, store only the upper triangle of the XtX matrix
val triangleSize = (factors*factors - factors)/2 + factors
- val matrix = DoubleMatrix.zeros(triangleSize)
- val fullMatrix = DoubleMatrix.zeros(factors, factors)
- val userXtX = new ArrayBuffer[DoubleMatrix]()
- val userXy = new ArrayBuffer[DoubleMatrix]()
+ val matrix = Array.fill(triangleSize)(0.0)
+ val fullMatrix = Array.fill(factors * factors)(0.0)
+ val userXtX = new ArrayBuffer[Array[Double]]()
+ val userXy = new ArrayBuffer[Array[Double]]()
val numRatings = new ArrayBuffer[Int]()
override def coGroup(left: lang.Iterable[(Int, Int, Array[Array[Double]])],
@@ -341,8 +349,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
val oldLength = userXtX.length
while(i < (numUsers - oldLength)) {
- userXtX += DoubleMatrix.zeros(triangleSize)
- userXy += DoubleMatrix.zeros(factors)
+ userXtX += Array.fill(triangleSize)(0.0)
+ userXy += Array.fill(factors)(0.0)
numRatings.+=(0)
i += 1
@@ -356,8 +364,9 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
i = 0
while(i < matricesToClear){
numRatings(i) = 0
- userXtX(i).fill(0.0f)
- userXy(i).fill(0.0f)
+
+ util.Arrays.fill(userXtX(i), 0.0)
+ util.Arrays.fill(userXy(i), 0.0)
i += 1
}
@@ -372,7 +381,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
var p = 0
while(p < blockFactors.length){
- val vector = new DoubleMatrix(blockFactors(p))
+ val vector = blockFactors(p)
+
outerProduct(vector, matrix, factors)
val (users, ratings) = inInfo.ratingsForBlock(itemBlock)(p)
@@ -380,8 +390,8 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
var i = 0
while (i < users.length) {
numRatings(users(i)) += 1
- userXtX(users(i)).addi(matrix)
- SimpleBlas.axpy(ratings(i), vector, userXy(users(i)))
+ blas.daxpy(matrix.length, 1, matrix, 1, userXtX(users(i)), 1)
+ blas.daxpy(vector.length, ratings(i), vector, 1, userXy(users(i)), 1)
i += 1
}
@@ -401,12 +411,14 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
// add regularization constant
while(f < factors){
- fullMatrix.data(f*factors + f) += lambda * numRatings(i)
+ fullMatrix(f*factors + f) += lambda * numRatings(i)
f += 1
}
// calculate new user vector
- array(i) = Solve.solvePositive(fullMatrix, userXy(i)).data
+ val result = new intW(0)
+ lapack.dposv("U", factors, 1, fullMatrix, factors , userXy(i), factors, result)
+ array(i) = userXy(i)
i += 1
}
@@ -696,16 +708,13 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
// ================================ Math helper functions ========================================
- def outerProduct(vector: DoubleMatrix, matrix: DoubleMatrix, factors: Int): Unit = {
- val vd = vector.data
- val md = matrix.data
-
+ def outerProduct(vector: Array[Double], matrix: Array[Double], factors: Int): Unit = {
var row = 0
var pos = 0
while(row < factors){
var col = 0
while(col <= row){
- md(pos) = vd(row) * vd(col)
+ matrix(pos) = vector(row) * vector(col)
col += 1
pos += 1
}
@@ -714,24 +723,22 @@ class ALS extends Learner[(Int, Int, Double), ALSModel] with Serializable {
}
}
- def generateFullMatrix(triangularMatrix: DoubleMatrix, fullMatrix: DoubleMatrix,
+ def generateFullMatrix(triangularMatrix: Array[Double], fullMatrix: Array[Double],
factors: Int): Unit = {
var row = 0
var pos = 0
- val fmd = fullMatrix.data
- val tmd = triangularMatrix.data
while(row < factors){
var col = 0
while(col < row){
- fmd(row*factors + col) = tmd(pos)
- fmd(col*factors + row) = tmd(pos)
+ fullMatrix(row*factors + col) = triangularMatrix(pos)
+ fullMatrix(col*factors + row) = triangularMatrix(pos)
pos += 1
col += 1
}
- fmd(row*factors + row) = tmd(pos)
+ fullMatrix(row*factors + row) = triangularMatrix(pos)
pos += 1
row += 1
@@ -893,7 +900,8 @@ object ALS {
* @param itemFactors Calcualted item matrix
* @param lambda Regularization value used to calculate the model
*/
-class ALSModel(@transient val userFactors: DataSet[Factors],@transient val itemFactors: DataSet[Factors],
+class ALSModel(@transient val userFactors: DataSet[Factors],
+ @transient val itemFactors: DataSet[Factors],
val lambda: Double) extends Transformer[(Int, Int), (Int, Int, Double)] with
Serializable{
@@ -905,10 +913,10 @@ Serializable{
triple => {
val (((uID, iID), uFactors), iFactors) = triple
- val uFactorsVector = new DoubleMatrix(uFactors.factors)
- val iFactorsVector = new DoubleMatrix(iFactors.factors)
+ val uFactorsVector = uFactors.factors
+ val iFactorsVector = iFactors.factors
- val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector)
+ val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
(uID, iID, prediction)
}
@@ -925,13 +933,13 @@ Serializable{
triple => {
val (((uID, iID), uFactors), iFactors) = triple
- val uFactorsVector = new DoubleMatrix(uFactors.factors)
- val iFactorsVector = new DoubleMatrix(iFactors.factors)
+ val uFactorsVector = uFactors.factors
+ val iFactorsVector = iFactors.factors
- val squaredUNorm2 = uFactorsVector.dot(uFactorsVector)
- val squaredINorm2 = iFactorsVector.dot(iFactorsVector)
+ val squaredUNorm2 = blas.ddot(uFactorsVector.length, uFactorsVector, 1, uFactorsVector, 1)
+ val squaredINorm2 = blas.ddot(iFactorsVector.length, iFactorsVector, 1, iFactorsVector, 1)
- val prediction = SimpleBlas.dot(uFactorsVector, iFactorsVector)
+ val prediction = blas.ddot(uFactorsVector.length, uFactorsVector, 1, iFactorsVector, 1)
(uID, iID, prediction, squaredUNorm2, squaredINorm2)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
index 523d132..8060d2b 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
@@ -23,11 +23,10 @@ import org.apache.flink.api.scala.DataSet
import org.apache.flink.configuration.Configuration
import org.apache.flink.ml.math.Vector
import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.JBlas._
import org.apache.flink.api.scala._
-import org.jblas.{SimpleBlas, DoubleMatrix}
+import com.github.fommil.netlib.BLAS.{ getInstance => blas }
/** Multiple linear regression using the ordinary least squares (OLS) estimator.
*
@@ -77,8 +76,9 @@ import org.jblas.{SimpleBlas, DoubleMatrix}
*
* - [[MultipleLinearRegression.Stepsize]]:
* Initial step size for the gradient descent method.
- * This value controls how far the gradient descent method moves in the opposite direction of the gradient.
- * Tuning this parameter might be crucial to make it stable and to obtain a better performance.
+ * This value controls how far the gradient descent method moves in the opposite direction of the
+ * gradient. Tuning this parameter might be crucial to make it stable and to obtain a better
+ * performance.
*
* - [[MultipleLinearRegression.ConvergenceThreshold]]:
* Threshold for relative change of sum of squared residuals until convergence.
@@ -113,7 +113,11 @@ with Serializable {
val convergenceThreshold = map.get(ConvergenceThreshold)
// calculate dimension of the feature vectors
- val dimension = input.map{_.vector.size}.reduce { math.max(_, _) }
+ val dimension = input.map{_.vector.size}.reduce {
+ (a, b) =>
+ require(a == b, "All input vector must have the same dimension.")
+ a
+ }
// initial weight vector is set to 0
val initialWeightVector = createInitialWeightVector(dimension)
@@ -150,7 +154,9 @@ with Serializable {
val (leftBetas, leftBeta0, leftCount) = left
val (rightBetas, rightBeta0, rightCount) = right
- (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount)
+ blas.daxpy(leftBetas.length, 1.0, rightBetas, 1, leftBetas, 1)
+
+ (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
}.map {
new LinearRegressionWeightsUpdate(stepsize)
}.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
@@ -197,7 +203,8 @@ with Serializable {
val (leftBetas, leftBeta0, leftCount) = left
val (rightBetas, rightBeta0, rightCount) = right
- (leftBetas.add(rightBetas), leftBeta0 + rightBeta0, leftCount + rightCount)
+ blas.daxpy(leftBetas.length, 1, rightBetas, 1, leftBetas, 1)
+ (leftBetas, leftBeta0 + rightBeta0, leftCount + rightCount)
}.map {
new LinearRegressionWeightsUpdate(stepsize)
}.withBroadcastSet(weightVector, WEIGHTVECTOR_BROADCAST)
@@ -216,11 +223,11 @@ with Serializable {
* @return DataSet of a zero vector of dimension d
*/
private def createInitialWeightVector(dimensionDS: DataSet[Int]):
- DataSet[(DoubleMatrix, Double)] = {
+ DataSet[(Array[Double], Double)] = {
dimensionDS.map {
dimension =>
val values = Array.fill(dimension)(0.0)
- (new DoubleMatrix(dimension, 1, values: _*), 0.0)
+ (values, 0.0)
}
}
}
@@ -261,13 +268,13 @@ object MultipleLinearRegression {
private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
- var weightVector: DoubleMatrix = null
+ var weightVector: Array[Double] = null
var weight0: Double = 0.0
@throws(classOf[Exception])
override def open(configuration: Configuration): Unit = {
val list = this.getRuntimeContext.
- getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+ getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
val weightsPair = list.get(0)
@@ -279,7 +286,9 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
val vector = value.vector
val label = value.label
- val residual = weightVector.dot(vector) + weight0 - label
+ val dotProduct = blas.ddot(weightVector.length, weightVector, 1, vector, 1)
+
+ val residual = dotProduct + weight0 - label
residual*residual
}
@@ -294,17 +303,17 @@ private class SquaredResiduals extends RichMapFunction[LabeledVector, Double] {
* The weight vector is received as a broadcast variable.
*/
private class LinearRegressionGradientDescent extends
-RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
+RichMapFunction[LabeledVector, (Array[Double], Double, Int)] {
import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
- var weightVector: DoubleMatrix = null
+ var weightVector: Array[Double] = null
var weight0: Double = 0.0
@throws(classOf[Exception])
override def open(configuration: Configuration): Unit = {
val list = this.getRuntimeContext.
- getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+ getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
val weightsPair = list.get(0)
@@ -312,13 +321,19 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
weight0 = weightsPair._2
}
- override def map(value: LabeledVector): (DoubleMatrix, Double, Int) = {
+ override def map(value: LabeledVector): (Array[Double], Double, Int) = {
val x = value.vector
val label = value.label
- val error = weightVector.dot(x) + weight0 - label
+ val dotProduct = blas.ddot(weightVector.length, weightVector, 1, x, 1)
+
+ val error = dotProduct + weight0 - label
+
+ // reuse vector x
+ val weightsGradient = x
+
+ blas.dscal(weightsGradient.length, 2*error, weightsGradient, 1)
- val weightsGradient = x.mul(2 * error)
val weight0Gradient = 2 * error
(weightsGradient, weight0Gradient, 1)
@@ -332,17 +347,17 @@ RichMapFunction[LabeledVector, (DoubleMatrix, Double, Int)] {
* @param stepsize Initial value of the step size used to update the weight vector
*/
private class LinearRegressionWeightsUpdate(val stepsize: Double) extends
-RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
+RichMapFunction[(Array[Double], Double, Int), (Array[Double], Double)] {
import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
- var weights: DoubleMatrix = null
+ var weights: Array[Double] = null
var weight0: Double = 0.0
@throws(classOf[Exception])
override def open(configuration: Configuration): Unit = {
val list = this.getRuntimeContext.
- getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+ getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
val weightsPair = list.get(0)
@@ -350,8 +365,10 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
weight0 = weightsPair._2
}
- override def map(value: (DoubleMatrix, Double, Int)): (DoubleMatrix, Double) = {
- val weightsGradient = value._1.div(value._3)
+ override def map(value: (Array[Double], Double, Int)): (Array[Double], Double) = {
+ val weightsGradient = value._1
+ blas.dscal(weightsGradient.length, 1.0/value._3, weightsGradient, 1)
+
val weight0Gradient = value._2 / value._3
val iteration = getIterationRuntimeContext.getSuperstepNumber
@@ -360,9 +377,8 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
// decreasing
val effectiveStepsize = stepsize/math.sqrt(iteration)
- val newWeights = new DoubleMatrix(weights.rows, weights.columns)
- newWeights.copy(weights)
- SimpleBlas.axpy( -effectiveStepsize, weightsGradient, newWeights)
+ val newWeights = weights.clone
+ blas.daxpy(newWeights.length, -effectiveStepsize, weightsGradient, 1, newWeights, 1)
val newWeight0 = weight0 - effectiveStepsize * weight0Gradient
(newWeights, newWeight0)
@@ -383,7 +399,7 @@ RichMapFunction[(DoubleMatrix, Double, Int), (DoubleMatrix, Double)] {
* @param weights DataSet containing the calculated weight vector
*/
class MultipleLinearRegressionModel private[regression]
-(val weights: DataSet[(DoubleMatrix, Double)]) extends
+(val weights: DataSet[(Array[Double], Double)]) extends
Transformer[ Vector, LabeledVector ] {
import MultipleLinearRegression.WEIGHTVECTOR_BROADCAST
@@ -403,13 +419,14 @@ Transformer[ Vector, LabeledVector ] {
}
private class LinearRegressionPrediction extends RichMapFunction[Vector, LabeledVector] {
- private var weights: DoubleMatrix = null
+ private var weights: Array[Double] = null
private var weight0: Double = 0
@throws(classOf[Exception])
override def open(configuration: Configuration): Unit = {
- val t = getRuntimeContext.getBroadcastVariable[(DoubleMatrix, Double)](WEIGHTVECTOR_BROADCAST)
+ val t = getRuntimeContext
+ .getBroadcastVariable[(Array[Double], Double)](WEIGHTVECTOR_BROADCAST)
val weightsPair = t.get(0)
@@ -418,7 +435,9 @@ Transformer[ Vector, LabeledVector ] {
}
override def map(value: Vector): LabeledVector = {
- val prediction = weights.dot(value) + weight0
+ val dotProduct = blas.ddot(weights.length, weights, 1, value, 1)
+
+ val prediction = dotProduct + weight0
LabeledVector(value, prediction)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
new file mode 100644
index 0000000..28fdfa6
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseITCase.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class PolynomialBaseITCase extends ShouldMatchers {
+
+ @Test
+ def testMapElementToPolynomialVectorSpace (): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism (2)
+
+ val input = Seq (
+ LabeledVector (DenseVector (1), 1.0),
+ LabeledVector (DenseVector (2), 2.0)
+ )
+
+ val inputDS = env.fromCollection (input)
+
+ val transformer = PolynomialBase ()
+ .setDegree (3)
+
+ val transformedDS = transformer.transform (inputDS)
+
+ val expectedMap = List (
+ (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
+ (2.0 -> DenseVector (8.0, 4.0, 2.0) )
+ ) toMap
+
+ val result = transformedDS.collect
+
+ for (entry <- result) {
+ expectedMap.contains (entry.label) should be (true)
+ entry.vector should equal (expectedMap (entry.label) )
+ }
+ }
+
+ @Test
+ def testMapVectorToPolynomialVectorSpace(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism(2)
+
+ val input = Seq(
+ LabeledVector(DenseVector(2, 3), 1.0),
+ LabeledVector(DenseVector(2, 3, 4), 2.0)
+ )
+
+ val expectedMap = List(
+ (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
+ (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
+ 9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
+ ) toMap
+
+ val inputDS = env.fromCollection(input)
+
+ val transformer = PolynomialBase()
+ .setDegree(3)
+
+ val transformedDS = transformer.transform(inputDS)
+
+ val result = transformedDS.collect
+
+ for(entry <- result) {
+ expectedMap.contains(entry.label) should be(true)
+ entry.vector should equal(expectedMap(entry.label))
+ }
+ }
+
+ @Test
+ def testReturnEmptyVectorIfDegreeIsZero(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism(2)
+
+ val input = Seq(
+ LabeledVector(DenseVector(2, 3), 1.0),
+ LabeledVector(DenseVector(2, 3, 4), 2.0)
+ )
+
+ val inputDS = env.fromCollection(input)
+
+ val transformer = PolynomialBase()
+ .setDegree(0)
+
+ val transformedDS = transformer.transform(inputDS)
+
+ val result = transformedDS.collect
+
+ val expectedMap = List(
+ (1.0 -> DenseVector()),
+ (2.0 -> DenseVector())
+ ) toMap
+
+ for(entry <- result) {
+ expectedMap.contains(entry.label) should be(true)
+ entry.vector should equal(expectedMap(entry.label))
+ }
+ }
+}
+
+object PolynomialBaseITCase {
+ @BeforeClass
+ def setup(): Unit = {
+ CliFrontendTestUtils.pipeSystemOutToNull()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
deleted file mode 100644
index 8da822f..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialBaseSuite.scala
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.feature
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.DenseVector
-import org.scalatest.{ShouldMatchers, FlatSpec}
-
-import org.apache.flink.api.scala._
-
-class PolynomialBaseSuite extends FlatSpec with ShouldMatchers {
- behavior of "A PolynomialBase"
-
- it should "map an element into a polynomial vector space" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = Seq(
- LabeledVector(DenseVector(1), 1.0),
- LabeledVector(DenseVector(2), 2.0)
- )
-
- val inputDS = env.fromCollection(input)
-
- val transformer = PolynomialBase()
- .setDegree(3)
-
- val transformedDS = transformer.transform(inputDS)
-
- val expectedMap = List(
- (1.0 -> DenseVector(1.0, 1.0, 1.0)),
- (2.0 -> DenseVector(8.0, 4.0, 2.0))
- ) toMap
-
- val result = transformedDS.collect
-
- for(entry <- result) {
- expectedMap.contains(entry.label) should be(true)
- entry.vector should equal(expectedMap(entry.label))
- }
-
- }
-
- it should "map a vector into a polynomial vector space" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = Seq(
- LabeledVector(DenseVector(2, 3), 1.0),
- LabeledVector(DenseVector(2, 3, 4), 2.0)
- )
-
- val expectedMap = List(
- (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
- (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
- 9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
- ) toMap
-
- val inputDS = env.fromCollection(input)
-
- val transformer = PolynomialBase()
- .setDegree(3)
-
- val transformedDS = transformer.transform(inputDS)
-
- val result = transformedDS.collect
-
- for(entry <- result) {
- expectedMap.contains(entry.label) should be(true)
- entry.vector should equal(expectedMap(entry.label))
- }
-
- println(result)
- }
-
- it should "return an empty vector if the polynomial degree is set to 0" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val input = Seq(
- LabeledVector(DenseVector(2, 3), 1.0),
- LabeledVector(DenseVector(2, 3, 4), 2.0)
- )
-
- val inputDS = env.fromCollection(input)
-
- val transformer = PolynomialBase()
- .setDegree(0)
-
- val transformedDS = transformer.transform(inputDS)
-
- val result = transformedDS.collect
-
- val expectedMap = List(
- (1.0 -> DenseVector()),
- (2.0 -> DenseVector())
- ) toMap
-
- for(entry <- result) {
- expectedMap.contains(entry.label) should be(true)
- entry.vector should equal(expectedMap(entry.label))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
new file mode 100644
index 0000000..f2c52d3
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITCase.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.recommendation
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class ALSITCase extends ShouldMatchers {
+
+ @Test
+ def testMatrixFactorization(): Unit = {
+ import ALSData._
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism(2)
+
+ val als = ALS()
+ .setIterations(iterations)
+ .setLambda(lambda)
+ .setBlocks(4)
+ .setNumFactors(numFactors)
+
+ val inputDS = env.fromCollection(data)
+
+ val model = als.fit(inputDS)
+
+ val testData = env.fromCollection(expectedResult.map{
+ case (userID, itemID, rating) => (userID, itemID)
+ })
+
+ val predictions = model.transform(testData).collect
+
+ predictions.length should equal(expectedResult.length)
+
+ val resultMap = expectedResult map {
+ case (uID, iID, value) => (uID, iID) -> value
+ } toMap
+
+ predictions foreach {
+ case (uID, iID, value) => {
+ resultMap.isDefinedAt(((uID, iID))) should be(true)
+
+ value should be(resultMap((uID, iID)) +- 0.1)
+ }
+ }
+
+ val risk = model.empiricalRisk(inputDS).collect(0)
+
+ risk should be(expectedEmpiricalRisk +- 1)
+ }
+}
+
+object ALSITCase {
+
+ @BeforeClass
+ def setup(): Unit = {
+ CliFrontendTestUtils.pipeSystemOutToNull()
+ }
+}
+
+object ALSData {
+
+ val iterations = 9
+ val lambda = 1.0
+ val numFactors = 5
+
+ val data: Seq[(Int, Int, Double)] = {
+ Seq(
+ (2,13,534.3937734561154),
+ (6,14,509.63176469621936),
+ (4,14,515.8246770897443),
+ (7,3,495.05234565105),
+ (2,3,532.3281786219485),
+ (5,3,497.1906356844367),
+ (3,3,512.0640508585093),
+ (10,3,500.2906742233019),
+ (1,4,521.9189079662882),
+ (2,4,515.0734651491396),
+ (1,7,522.7532725967008),
+ (8,4,492.65683825096403),
+ (4,8,492.65683825096403),
+ (10,8,507.03319667905413),
+ (7,1,522.7532725967008),
+ (1,1,572.2230209271174),
+ (2,1,563.5849190220224),
+ (6,1,518.4844061038742),
+ (9,1,529.2443732217674),
+ (8,1,543.3202505434103),
+ (7,2,516.0188923307859),
+ (1,2,563.5849190220224),
+ (1,11,515.1023793011227),
+ (8,2,536.8571133978352),
+ (2,11,507.90776961762225),
+ (3,2,532.3281786219485),
+ (5,11,476.24185144363304),
+ (4,2,515.0734651491396),
+ (4,11,469.92049343738233),
+ (3,12,509.4713776280098),
+ (4,12,494.6533165132021),
+ (7,5,482.2907867916308),
+ (6,5,477.5940040923741),
+ (4,5,480.9040684364228),
+ (1,6,518.4844061038742),
+ (6,6,470.6605085832807),
+ (8,6,489.6360564705307),
+ (4,6,472.74052954447046),
+ (7,9,482.5837650471611),
+ (5,9,487.00175463269863),
+ (9,9,500.69514584780944),
+ (4,9,477.71644808419325),
+ (7,10,485.3852917539852),
+ (8,10,507.03319667905413),
+ (3,10,500.2906742233019),
+ (5,15,488.08215944254437),
+ (6,15,480.16929757607346)
+ )
+ }
+
+ val expectedResult: Seq[(Int, Int, Double)] = {
+ Seq(
+ (2, 2, 526.1037),
+ (5, 9, 468.5680),
+ (10, 3, 484.8975),
+ (5, 13, 451.6228),
+ (1, 15, 493.4956),
+ (4, 11, 456.3862)
+ )
+ }
+
+ val expectedEmpiricalRisk = 505374.1877
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
deleted file mode 100644
index 770d4d2..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSSuite.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.recommendation
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.scalatest.{ShouldMatchers, FlatSpec}
-
-import org.apache.flink.api.scala._
-
-class ALSSuite extends FlatSpec with ShouldMatchers {
-
- behavior of "ALS"
-
- it should "factorize a given matrix" in {
- import ALSData._
-
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val als = ALS()
- .setIterations(iterations)
- .setLambda(lambda)
- .setBlocks(4)
- .setNumFactors(numFactors)
-
- val inputDS = env.fromCollection(data)
-
- val model = als.fit(inputDS)
-
- val testData = env.fromCollection(expectedResult.map{
- case (userID, itemID, rating) => (userID, itemID)
- })
-
- val predictions = model.transform(testData).collect
-
- predictions.length should equal(expectedResult.length)
-
- val resultMap = expectedResult map {
- case (uID, iID, value) => (uID, iID) -> value
- } toMap
-
- predictions foreach {
- case (uID, iID, value) => {
- resultMap.isDefinedAt(((uID, iID))) should be(true)
-
- value should be(resultMap((uID, iID)) +- 0.1)
- }
- }
-
- val risk = model.empiricalRisk(inputDS).collect(0)
-
- risk should be(expectedEmpiricalRisk +- 1)
- }
-}
-
-object ALSData {
-
- val iterations = 9
- val lambda = 1.0
- val numFactors = 5
-
- val data: Seq[(Int, Int, Double)] = {
- Seq(
- (2,13,534.3937734561154),
- (6,14,509.63176469621936),
- (4,14,515.8246770897443),
- (7,3,495.05234565105),
- (2,3,532.3281786219485),
- (5,3,497.1906356844367),
- (3,3,512.0640508585093),
- (10,3,500.2906742233019),
- (1,4,521.9189079662882),
- (2,4,515.0734651491396),
- (1,7,522.7532725967008),
- (8,4,492.65683825096403),
- (4,8,492.65683825096403),
- (10,8,507.03319667905413),
- (7,1,522.7532725967008),
- (1,1,572.2230209271174),
- (2,1,563.5849190220224),
- (6,1,518.4844061038742),
- (9,1,529.2443732217674),
- (8,1,543.3202505434103),
- (7,2,516.0188923307859),
- (1,2,563.5849190220224),
- (1,11,515.1023793011227),
- (8,2,536.8571133978352),
- (2,11,507.90776961762225),
- (3,2,532.3281786219485),
- (5,11,476.24185144363304),
- (4,2,515.0734651491396),
- (4,11,469.92049343738233),
- (3,12,509.4713776280098),
- (4,12,494.6533165132021),
- (7,5,482.2907867916308),
- (6,5,477.5940040923741),
- (4,5,480.9040684364228),
- (1,6,518.4844061038742),
- (6,6,470.6605085832807),
- (8,6,489.6360564705307),
- (4,6,472.74052954447046),
- (7,9,482.5837650471611),
- (5,9,487.00175463269863),
- (9,9,500.69514584780944),
- (4,9,477.71644808419325),
- (7,10,485.3852917539852),
- (8,10,507.03319667905413),
- (3,10,500.2906742233019),
- (5,15,488.08215944254437),
- (6,15,480.16929757607346)
- )
- }
-
- val expectedResult: Seq[(Int, Int, Double)] = {
- Seq(
- (2, 2, 526.1037),
- (5, 9, 468.5680),
- (10, 3, 484.8975),
- (5, 13, 451.6228),
- (1, 15, 493.4956),
- (4, 11, 456.3862)
- )
- }
-
- val expectedEmpiricalRisk = 505374.1877
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/21e2d96f/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
new file mode 100644
index 0000000..eb825b9
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITCase.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.regression
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.client.CliFrontendTestUtils
+import org.apache.flink.ml.common.ParameterMap
+import org.apache.flink.ml.feature.PolynomialBase
+import org.junit.{BeforeClass, Test}
+import org.scalatest.ShouldMatchers
+
+import org.apache.flink.api.scala._
+
+class MultipleLinearRegressionITCase extends ShouldMatchers {
+
+ @Test
+ def testEstimationOfLinearFunction(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism(2)
+
+ val learner = MultipleLinearRegression()
+
+ import RegressionData._
+
+ val parameters = ParameterMap()
+
+ parameters.add(MultipleLinearRegression.Stepsize, 1.0)
+ parameters.add(MultipleLinearRegression.Iterations, 10)
+ parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+ val inputDS = env.fromCollection(data)
+ val model = learner.fit(inputDS, parameters)
+
+ val weightList = model.weights.collect
+
+ weightList.size should equal(1)
+
+ val (weights, weight0) = weightList(0)
+
+ expectedWeights zip weights foreach {
+ case (expectedWeight, weight) =>
+ weight should be (expectedWeight +- 1)
+ }
+ weight0 should be (expectedWeight0 +- 0.4)
+
+ val srs = model.squaredResidualSum(inputDS).collect(0)
+
+ srs should be (expectedSquaredResidualSum +- 2)
+ }
+
+ @Test
+ def testEstimationOfCubicFunction(): Unit = {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ env.setDegreeOfParallelism(2)
+
+ val polynomialBase = PolynomialBase()
+ val learner = MultipleLinearRegression()
+
+ val pipeline = polynomialBase.chain(learner)
+
+ val inputDS = env.fromCollection(RegressionData.polynomialData)
+
+ val parameters = ParameterMap()
+ .add(PolynomialBase.Degree, 3)
+ .add(MultipleLinearRegression.Stepsize, 0.002)
+ .add(MultipleLinearRegression.Iterations, 100)
+
+ val model = pipeline.fit(inputDS, parameters)
+
+ val weightList = model.weights.collect
+
+ weightList.size should equal(1)
+
+ val (weights, weight0) = weightList(0)
+
+ RegressionData.expectedPolynomialWeights.zip(weights) foreach {
+ case (expectedWeight, weight) =>
+ weight should be(expectedWeight +- 0.1)
+ }
+
+ weight0 should be(RegressionData.expectedPolynomialWeight0 +- 0.1)
+
+ val transformedInput = polynomialBase.transform(inputDS, parameters)
+
+ val srs = model.squaredResidualSum(transformedInput).collect(0)
+
+ srs should be(RegressionData.expectedPolynomialSquaredResidualSum +- 5)
+ }
+}
+
+object MultipleLinearRegressionITCase{
+
+ @BeforeClass
+ def setup(): Unit = {
+ CliFrontendTestUtils.pipeSystemOutToNull()
+ }
+}