You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/05/22 10:43:39 UTC
[7/7] flink git commit: [FLINK-2050] [ml] Ports existing ML
algorithms to new pipeline mechanism
[FLINK-2050] [ml] Ports existing ML algorithms to new pipeline mechanism
Adds pipeline comments
Adds pipeline IT case
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e574750
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e574750
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e574750
Branch: refs/heads/master
Commit: 1e574750da55613582aacab5fbdf1a6730d95b96
Parents: fde0341
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 20 13:49:52 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 22 10:42:51 2015 +0200
----------------------------------------------------------------------
docs/libs/ml/als.md | 4 +-
docs/libs/ml/cocoa.md | 6 +-
docs/libs/ml/multiple_linear_regression.md | 4 +-
docs/libs/ml/polynomial_base_feature_mapper.md | 4 +-
docs/libs/ml/standard_scaler.md | 3 +
.../src/test/assembly/test-assembly.xml | 2 +-
.../flink/configuration/ConfigurationTest.java | 2 +-
flink-dist/src/main/assemblies/bin.xml | 2 +-
flink-examples/flink-java-examples/pom.xml | 4 +-
.../examples/java/relational/TPCHQuery3.java | 2 +-
flink-examples/flink-scala-examples/pom.xml | 6 +-
.../aggregation/AvgAggregationFunction.java | 2 +-
.../flink/api/java/tuple/TupleGenerator.java | 8 +-
flink-java8/pom.xml | 2 +-
.../optimizer/traversals/package-info.java | 2 +-
.../main/resources/archetype-resources/pom.xml | 8 +-
.../archetype-resources/src/main/java/Job.java | 4 +-
.../main/resources/archetype-resources/pom.xml | 8 +-
.../src/main/scala/Job.scala | 4 +-
.../src/main/java/LocalJob.java | 4 +-
.../src/main/java/YarnJob.java | 4 +-
.../messages/checkpoint/package-info.java | 2 +-
.../flink/runtime/messages/package-info.java | 2 +-
.../flink/runtime/util/JarFileCreator.java | 4 +-
.../flink/runtime/util/JarFileCreatorTest.java | 2 +-
.../org/apache/flink/api/scala/DataSet.scala | 2 +-
.../src/test/assembly/test-assembly.xml | 2 +-
.../datatypes/HadoopFileOutputCommitter.java | 2 +-
.../api/java/python/PythonPlanBinder.java | 4 +-
flink-staging/flink-ml/pom.xml | 4 +-
.../apache/flink/ml/classification/CoCoA.scala | 326 +++++-----
.../apache/flink/ml/common/ChainedLearner.scala | 45 --
.../flink/ml/common/ChainedTransformer.scala | 43 --
.../org/apache/flink/ml/common/Learner.scala | 38 --
.../apache/flink/ml/common/Transformer.scala | 50 --
.../ml/experimental/ChainedPredictor.scala | 67 --
.../ml/experimental/ChainedTransformer.scala | 65 --
.../flink/ml/experimental/Estimator.scala | 110 ----
.../apache/flink/ml/experimental/KMeans.scala | 50 --
.../apache/flink/ml/experimental/Offset.scala | 50 --
.../flink/ml/experimental/Predictor.scala | 87 ---
.../apache/flink/ml/experimental/Scaler.scala | 52 --
.../flink/ml/experimental/Transformer.scala | 94 ---
.../flink/ml/feature/PolynomialBase.scala | 148 -----
.../scala/org/apache/flink/ml/math/Breeze.scala | 10 +-
.../flink/ml/math/BreezeVectorConverter.scala | 75 +++
.../org/apache/flink/ml/math/CanCopy.scala | 23 -
.../org/apache/flink/ml/math/DenseVector.scala | 4 -
.../scala/org/apache/flink/ml/math/Vector.scala | 6 -
.../apache/flink/ml/math/VectorBuilder.scala | 57 ++
.../org/apache/flink/ml/math/package.scala | 2 -
.../flink/ml/pipeline/ChainedPredictor.scala | 115 ++++
.../flink/ml/pipeline/ChainedTransformer.scala | 107 +++
.../apache/flink/ml/pipeline/Estimator.scala | 175 +++++
.../apache/flink/ml/pipeline/Predictor.scala | 149 +++++
.../apache/flink/ml/pipeline/Transformer.scala | 180 ++++++
.../ml/preprocessing/PolynomialFeatures.scala | 209 ++++++
.../flink/ml/preprocessing/StandardScaler.scala | 200 ++++--
.../apache/flink/ml/recommendation/ALS.scala | 647 ++++++++++---------
.../regression/MultipleLinearRegression.scala | 374 ++++++-----
.../src/test/resources/log4j-test.properties | 33 +-
.../flink/ml/classification/CoCoAITSuite.scala | 52 ++
.../flink/ml/classification/CoCoASuite.scala | 52 --
.../ml/experimental/SciKitPipelineSuite.scala | 70 --
.../ml/feature/PolynomialBaseITSuite.scala | 9 +-
.../flink/ml/pipeline/PipelineITSuite.scala | 146 +++++
.../preprocessing/StandardScalerITSuite.scala | 10 +-
.../flink/ml/recommendation/ALSITSuite.scala | 6 +-
.../MultipleLinearRegressionITSuite.scala | 24 +-
.../flink-streaming-examples/pom.xml | 2 +-
.../apache/flink/api/table/package-info.java | 2 +-
.../apache/flink/api/scala/table/package.scala | 2 +-
.../flink/api/table/expressions/package.scala | 2 +-
.../org/apache/flink/api/table/package.scala | 2 +-
.../apache/flink/api/table/plan/package.scala | 2 +-
.../flink/api/table/runtime/package.scala | 2 +-
.../tachyon/TachyonFileSystemWrapperTest.java | 2 +-
.../apache/flink/tez/examples/TPCHQuery3.java | 2 +-
.../test/assembly/test-custominput-assembly.xml | 2 +-
.../src/test/assembly/test-kmeans-assembly.xml | 2 +-
.../test-streamingclassloader-assembly.xml | 2 +-
.../test/recordJobTests/TPCHQuery10ITCase.java | 2 +-
.../test/recordJobTests/TPCHQuery3ITCase.java | 2 +-
.../TPCHQuery3WithUnionITCase.java | 2 +-
.../test/recordJobTests/TPCHQuery4ITCase.java | 2 +-
.../test/recordJobTests/TPCHQuery9ITCase.java | 2 +-
flink-yarn-tests/pom.xml | 2 +-
.../org/apache/flink/yarn/YarnTestBase.java | 2 +-
.../org/apache/flink/yarn/FlinkYarnClient.java | 2 +-
89 files changed, 2266 insertions(+), 1843 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/als.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/als.md b/docs/libs/ml/als.md
index 951fc3c..f827e4f 100644
--- a/docs/libs/ml/als.md
+++ b/docs/libs/ml/als.md
@@ -147,11 +147,11 @@ val parameters = ParameterMap()
.add(ALS.Seed, 42l)
// Calculate the factorization
-val factorization = als.fit(inputDS, parameters)
+als.fit(inputDS, parameters)
// Read the testing data set from a csv file
val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
// Calculate the ratings according to the matrix factorization
-val predictedRatings = factorization.transform(testingDS)
+val predictedRatings = als.predict(testingDS)
{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/cocoa.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/cocoa.md b/docs/libs/ml/cocoa.md
index 0327f8f..cd13806 100644
--- a/docs/libs/ml/cocoa.md
+++ b/docs/libs/ml/cocoa.md
@@ -146,7 +146,7 @@ The CoCoA implementation can be controlled by the following parameters:
val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
// Create the CoCoA learner
-val cocoa = CoCoA()
+val svm = CoCoA()
.setBlocks(10)
.setIterations(10)
.setLocalIterations(10)
@@ -154,11 +154,11 @@ val cocoa = CoCoA()
.setStepsize(0.5)
// Learn the SVM model
-val svm = cocoa.fit(trainingDS)
+svm.fit(trainingDS)
// Read the testing data set
val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
// Calculate the predictions for the testing data set
-val predictionDS: DataSet[LabeledVector] = model.transform(testingDS)
+val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/multiple_linear_regression.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/multiple_linear_regression.md b/docs/libs/ml/multiple_linear_regression.md
index 840e899..c11425d 100644
--- a/docs/libs/ml/multiple_linear_regression.md
+++ b/docs/libs/ml/multiple_linear_regression.md
@@ -117,8 +117,8 @@ val trainingDS: DataSet[LabeledVector] = ...
val testingDS: DataSet[Vector] = ...
// Fit the linear model to the provided data
-val model = mlr.fit(trainingDS)
+mlr.fit(trainingDS)
// Calculate the predictions for the test data
-val predictions = model.transform(testingDS)
+val predictions = mlr.predict(testingDS)
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/polynomial_base_feature_mapper.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/polynomial_base_feature_mapper.md b/docs/libs/ml/polynomial_base_feature_mapper.md
index 2964f04..801e3cf 100644
--- a/docs/libs/ml/polynomial_base_feature_mapper.md
+++ b/docs/libs/ml/polynomial_base_feature_mapper.md
@@ -84,8 +84,8 @@ val parameters = ParameterMap()
.add(MultipleLinearRegression.Stepsize, 0.5)
// Create pipeline PolynomialBase -> MultipleLinearRegression
-val chained = polyBase.chain(mlr)
+val pipeline = polyBase.chainPredictor(mlr)
// Learn the model
-val model = chained.fit(trainingDS)
+pipeline.fit(trainingDS)
{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/standard_scaler.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/standard_scaler.md b/docs/libs/ml/standard_scaler.md
index aae4620..2ea21d6 100644
--- a/docs/libs/ml/standard_scaler.md
+++ b/docs/libs/ml/standard_scaler.md
@@ -85,6 +85,9 @@ val scaler = StandardScaler()
// Obtain data set to be scaled
val dataSet: DataSet[Vector] = ...
+// Learn the mean and standard deviation of the training data
+scaler.fit(dataSet)
+
// Scale the provided data set to have mean=10.0 and std=2.0
val scaledDS = scaler.transform(dataSet)
{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-clients/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/assembly/test-assembly.xml b/flink-clients/src/test/assembly/test-assembly.xml
index 60b27d2..aa7b7d1 100644
--- a/flink-clients/src/test/assembly/test-assembly.xml
+++ b/flink-clients/src/test/assembly/test-assembly.xml
@@ -27,7 +27,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
- <!--modify/add include to match your pipeline(s) -->
+ <!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/client/testjar/**</include>
</includes>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index f7039c7..e131892 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
/**
- * This class contains test for the configuration pipeline. In particular, the serialization of {@link Configuration}
+ * This class contains test for the configuration package. In particular, the serialization of {@link Configuration}
* objects is tested.
*/
public class ConfigurationTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index d9dacf3..6a429ee 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -130,7 +130,7 @@ under the License.
</excludes>
</fileSet>
<fileSet>
- <!-- copy python pipeline -->
+ <!-- copy python package -->
<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
<outputDirectory>resources/python/</outputDirectory>
<fileMode>0755</fileMode>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 2a335b5..a7964cb 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -205,7 +205,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery10</id>
- <phase>pipeline</phase>
+ <phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
@@ -228,7 +228,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery3</id>
- <phase>pipeline</phase>
+ <phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index e66493a..9a6e58c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -135,7 +135,7 @@ public class TPCHQuery3 {
}
});
- // Join customers with orders and pipeline them into a ShippingPriorityItem
+ // Join customers with orders and package them into a ShippingPriorityItem
DataSet<ShippingPriorityItem> customerWithOrders =
customers.join(orders).where(0).equalTo(1)
.with(
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index 14fa874..5127c48 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -170,7 +170,7 @@ under the License.
</configuration>
</plugin>
- <!-- get default data from flink-java-examples pipeline -->
+ <!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
@@ -357,7 +357,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery10</id>
- <phase>pipeline</phase>
+ <phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
@@ -380,7 +380,7 @@ under the License.
<!--
<execution>
<id>TPCHQuery3</id>
- <phase>pipeline</phase>
+ <phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
index 1472cd9..b433d66 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
@@ -17,7 +17,7 @@ package org.apache.flink.api.java.aggregation;
* limitations under the License.
*/
-//pipeline org.apache.flink.api.java.aggregation;
+//package org.apache.flink.api.java.aggregation;
//
//
//public abstract class AvgAggregationFunction<T> extends AggregationFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index 2149180..03826fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -524,8 +524,8 @@ class TupleGenerator {
// head
w.print(HEADER);
- // pipeline and imports
- w.println("pipeline " + PACKAGE + ';');
+ // package and imports
+ w.println("package " + PACKAGE + ';');
w.println();
w.println("import org.apache.flink.util.StringUtils;");
w.println();
@@ -780,8 +780,8 @@ class TupleGenerator {
// head
w.print(HEADER);
- // pipeline and imports
- w.println("pipeline " + PACKAGE + "." + BUILDER_SUFFIX + ';');
+ // package and imports
+ w.println("package " + PACKAGE + "." + BUILDER_SUFFIX + ';');
w.println();
w.println("import java.util.LinkedList;");
w.println("import java.util.List;");
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 5293221..0d82ea2 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -101,7 +101,7 @@ under the License.
</configuration>
</plugin>
- <!-- get default data from flink-java-examples pipeline -->
+ <!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
index d125475..cd8766c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
@@ -17,7 +17,7 @@
*/
/**
- * This pipeline contains the various traversals over the program plan and the
+ * This package contains the various traversals over the program plan and the
* optimizer DAG (directed acyclic graph) that are made in the course of
* the optimization.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index b1e3fac..30f2315 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -48,7 +48,7 @@ under the License.
<!--
- Execute "mvn clean pipeline -Pbuild-jar"
+ Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
@@ -61,11 +61,11 @@ under the License.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
- b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
+ b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
- b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
+ b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->
@@ -98,7 +98,7 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
- <!-- Run shade goal on pipeline phase -->
+ <!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
index 6e813a4..603fc80 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
* Skeleton for a Flink Job.
*
* For a full example of a Flink Job, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
- * mvn clean pipeline
+ * mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 299fa24..e940b90 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -49,7 +49,7 @@ under the License.
<!--
- Execute "mvn clean pipeline -Pbuild-jar"
+ Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
@@ -62,11 +62,11 @@ under the License.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
- b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
+ b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
- b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
+ b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->
@@ -102,7 +102,7 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
- <!-- Run shade goal on pipeline phase -->
+ <!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
index 44a7a03..3c34b0a 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -24,12 +24,12 @@ import org.apache.flink.api.scala._
* Skeleton for a Flink Job.
*
* For a full example of a Flink Job, see the WordCountJob.scala file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster. Just type
* {{{
- * mvn clean pipeline
+ * mvn clean package
* }}}
* in the projects root directory. You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
index e324420..cf7474e 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
@@ -24,12 +24,12 @@ import org.apache.flink.tez.client.LocalTezEnvironment;
* Skeleton for a Flink on Tez Job running using Tez local mode.
*
* For a full example of a Flink on TezJob, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
- * mvn clean pipeline
+ * mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
index 1b0bbcf..51627d5 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -24,12 +24,12 @@ import org.apache.flink.tez.client.RemoteTezEnvironment;
* Skeleton for a Flink on Tez program running on Yarn.
*
* For a full example of a Flink on Tez program, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
*
* You can also generate a .jar file that you can submit on your Flink
* cluster.
* Just type
- * mvn clean pipeline
+ * mvn clean package
* in the projects root directory.
* You will find the jar in
* target/flink-quickstart-0.1-SNAPSHOT-Sample.jar
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
index 7e422b2..7b96b81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
@@ -17,7 +17,7 @@
*/
/**
- * This pipeline contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
* and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the
* distributed dataflow.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
index 78620a8..e0b8cce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
@@ -17,7 +17,7 @@
*/
/**
- * This pipeline contains the messages that are sent between actors, like the
+ * This package contains the messages that are sent between actors, like the
* {@link org.apache.flink.runtime.jobmanager.JobManager} and
* {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index 9e39777..c55a9dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -91,10 +91,10 @@ public class JarFileCreator {
}
/**
- * Manually specify the pipeline of the dependencies.
+ * Manually specify the package of the dependencies.
*
* @param p
- * the pipeline to be included.
+ * the package to be included.
*/
public synchronized JarFileCreator addPackage(String p) {
this.packages.add(p);
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index 2e39068..ba207ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -199,7 +199,7 @@ public class JarFileCreatorTest {
ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
ans.add("org/apache/flink/util/Collector.class");
- Assert.assertTrue("Jar file for UDF pipeline is not correct", validate(ans, out));
+ Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out));
out.delete();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index e146687..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -67,7 +67,7 @@ import scala.reflect.ClassTag
*
* A rich function can be used when more control is required, for example for accessing the
* `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions
- * are named similarly. All functions are available in pipeline
+ * are named similarly. All functions are available in package
* `org.apache.flink.api.common.functions`.
*
* The elements are partitioned depending on the parallelism of the
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
index 86563b0..0f4561a 100644
--- a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
+++ b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
@@ -27,7 +27,7 @@ under the License.
<fileSet>
<directory>${project.build.testOutputDirectory}</directory>
<outputDirectory>/</outputDirectory>
- <!--modify/add include to match your pipeline(s) -->
+ <!--modify/add include to match your package(s) -->
<includes>
<include>org/apache/flink/api/avro/testjar/**</include>
</includes>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
index b9c28c5..ce4955c 100644
--- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils;
/**
* Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
- * as input parameter. However JobContext class is pipeline private, and in Hadoop 2.2.0 it's public.
+ * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
* This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
*/
public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
index 7d180b6..c278f5c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -125,14 +125,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
//=====Setup========================================================================================================
/**
* Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big
- * pipeline, and resolves PYTHONPATH issues.
+ * package, and resolves PYTHONPATH issues.
*
* @param filePaths
* @throws IOException
* @throws URISyntaxException
*/
private void prepareFiles(String... filePaths) throws IOException, URISyntaxException {
- //Flink python pipeline
+ //Flink python package
String tempFilePath = FLINK_PYTHON_FILE_PATH;
clearPath(tempFilePath);
FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false);
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml
index bc61e2e..43e6605 100644
--- a/flink-staging/flink-ml/pom.xml
+++ b/flink-staging/flink-ml/pom.xml
@@ -117,7 +117,7 @@
</goals>
<configuration>
<suffixes>(?<!(IT|Integration))(Test|Suite|Case)</suffixes>
- <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit</argLine>
+ <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</execution>
<execution>
@@ -128,7 +128,7 @@
</goals>
<configuration>
<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
- <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit</argLine>
+ <argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
</configuration>
</execution>
</executions>
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
index e1c2053..4ba9299 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
@@ -18,6 +18,8 @@
package org.apache.flink.ml.classification
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
@@ -66,60 +68,63 @@ import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
* {{{
* val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
*
- * val cocoa = CoCoA()
+ * val svm = CoCoA()
* .setBlocks(10)
* .setIterations(10)
* .setLocalIterations(10)
* .setRegularization(0.5)
* .setStepsize(0.5)
*
- * val svm = cocoa.fit(trainingDS)
+ * svm.fit(trainingDS)
*
* val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
*
- * val predictionDS: DataSet[LabeledVector] = model.transform(testingDS)
+ * val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
* }}}
*
* =Parameters=
*
- * - [[CoCoA.Blocks]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.Blocks]]:
* Sets the number of blocks into which the input data will be split. On each block the local
* stochastic dual coordinate ascent method is executed. This number should be set at least to
* the degree of parallelism. If no value is specified, then the parallelism of the input
* [[DataSet]] is used as the number of blocks. (Default value: '''None''')
*
- * - [[CoCoA.Iterations]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.Iterations]]:
* Defines the maximum number of iterations of the outer loop method. In other words, it defines
* how often the SDCA method is applied to the blocked data. After each iteration, the locally
* computed weight vector updates have to be reduced to update the global weight vector value.
* The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
* (Default value: '''10''')
*
- * - [[CoCoA.LocalIterations]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.LocalIterations]]:
* Defines the maximum number of SDCA iterations. In other words, it defines how many data points
* are drawn from each local data block to calculate the stochastic dual coordinate ascent.
* (Default value: '''10''')
*
- * - [[CoCoA.Regularization]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.Regularization]]:
* Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
* will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
* SVM margin will be wider even though it might contain some false classifications.
* (Default value: '''1.0''')
*
- * - [[CoCoA.Stepsize]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.Stepsize]]:
* Defines the initial step size for the updates of the weight vector. The larger the step size
* is, the larger will be the contribution of the weight vector updates to the next weight vector
* value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
* in case that the algorithm becomes instable. (Default value: '''1.0''')
*
- * - [[CoCoA.Seed]]:
+ * - [[org.apache.flink.ml.classification.CoCoA.Seed]]:
* Defines the seed to initialize the random number generator. The seed directly controls which
* data points are chosen for the SDCA method. (Default value: '''0''')
*/
-class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
+class CoCoA extends Predictor[CoCoA] {
import CoCoA._
+ /** Stores the learned weight vector after the fit operation */
+ var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
+
/** Sets the number of data blocks/partitions
*
* @param blocks
@@ -179,73 +184,168 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
parameters.add(Seed, seed)
this
}
+}
- /** Trains a SVM with soft-margin based on the given training data set.
+/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
+ * of the algorithm.
+ */
+object CoCoA{
+ val WEIGHT_VECTOR ="weightVector"
+
+ // ========================================== Parameters =========================================
+
+ case object Blocks extends Parameter[Int] {
+ val defaultValue: Option[Int] = None
+ }
+
+ case object Iterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object LocalIterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object Regularization extends Parameter[Double] {
+ val defaultValue = Some(1.0)
+ }
+
+ case object Stepsize extends Parameter[Double] {
+ val defaultValue = Some(1.0)
+ }
+
+ case object Seed extends Parameter[Long] {
+ val defaultValue = Some(0L)
+ }
+
+ // ========================================== Factory methods ====================================
+
+ def apply(): CoCoA = {
+ new CoCoA()
+ }
+
+ // ========================================== Operations =========================================
+
+ /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
+ * [[LabeledVector]]
*
- * @param input Training data set
- * @param fitParameters Parameter values
- * @return Trained SVM model
+ * @tparam T Subtype of [[Vector]]
+ * @return
+ */
+ implicit def predictValues[T <: Vector] = {
+ new PredictOperation[CoCoA, T, LabeledVector]{
+ override def predict(
+ instance: CoCoA,
+ predictParameters: ParameterMap,
+ input: DataSet[T])
+ : DataSet[LabeledVector] = {
+
+ instance.weightsOption match {
+ case Some(weights) => {
+ input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
+ }
+ }
+ }
+ }
+ }
+
+ /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
+ * we broadcast the weight vector to all mappers.
*/
- override def fit(input: DataSet[LabeledVector], fitParameters: ParameterMap): CoCoAModel = {
- val resultingParameters = this.parameters ++ fitParameters
+ class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
- // Check if the number of blocks/partitions has been specified
- val blocks = resultingParameters.get(Blocks) match {
- case Some(value) => value
- case None => input.getParallelism
+ var weights: BreezeDenseVector[Double] = _
+
+ @throws(classOf[Exception])
+ override def open(configuration: Configuration): Unit = {
+ // get current weights
+ weights = getRuntimeContext.
+ getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
}
- val scaling = resultingParameters(Stepsize)/blocks
- val iterations = resultingParameters(Iterations)
- val localIterations = resultingParameters(LocalIterations)
- val regularization = resultingParameters(Regularization)
- val seed = resultingParameters(Seed)
-
- // Obtain DataSet with the dimension of the data points
- val dimension = input.map{_.vector.size}.reduce{
- (a, b) => {
- require(a == b, "Dimensions of feature vectors have to be equal.")
- a
- }
+ override def map(vector: T): LabeledVector = {
+ // calculate the prediction value (scaled distance from the separating hyperplane)
+ val dotProduct = weights dot vector.asBreeze
+
+ LabeledVector(dotProduct, vector)
}
+ }
- val initialWeights = createInitialWeights(dimension)
-
- // Count the number of vectors, but keep the value in a DataSet to broadcast it later
- // TODO: Once efficient count and intermediate result partitions are implemented, use count
- val numberVectors = input map { x => 1 } reduce { _ + _ }
-
- // Group the input data into blocks in round robin fashion
- val blockedInputNumberElements = FlinkTools.block(input, blocks, Some(ModuloKeyPartitioner)).
- cross(numberVectors).
- map { x => x }
-
- val resultingWeights = initialWeights.iterate(iterations) {
- weights => {
- // compute the local SDCA to obtain the weight vector updates
- val deltaWs = localDualMethod(
- weights,
- blockedInputNumberElements,
- localIterations,
- regularization,
- scaling,
- seed
- )
-
- // scale the weight vectors
- val weightedDeltaWs = deltaWs map {
- deltaW => {
- deltaW :*= scaling
+ /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
+ *
+ */
+ implicit val fitCoCoA = {
+ new FitOperation[CoCoA, LabeledVector] {
+ override def fit(
+ instance: CoCoA,
+ fitParameters: ParameterMap,
+ input: DataSet[LabeledVector])
+ : Unit = {
+ val resultingParameters = instance.parameters ++ fitParameters
+
+ // Check if the number of blocks/partitions has been specified
+ val blocks = resultingParameters.get(Blocks) match {
+ case Some(value) => value
+ case None => input.getParallelism
+ }
+
+ val scaling = resultingParameters(Stepsize)/blocks
+ val iterations = resultingParameters(Iterations)
+ val localIterations = resultingParameters(LocalIterations)
+ val regularization = resultingParameters(Regularization)
+ val seed = resultingParameters(Seed)
+
+ // Obtain DataSet with the dimension of the data points
+ val dimension = input.map{_.vector.size}.reduce{
+ (a, b) => {
+ require(a == b, "Dimensions of feature vectors have to be equal.")
+ a
}
}
- // calculate the new weight vector by adding the weight vector updates to the weight vector
- // value
- weights.union(weightedDeltaWs).reduce { _ + _ }
+ val initialWeights = createInitialWeights(dimension)
+
+ // Count the number of vectors, but keep the value in a DataSet to broadcast it later
+ // TODO: Once efficient count and intermediate result partitions are implemented, use count
+ val numberVectors = input map { x => 1 } reduce { _ + _ }
+
+ // Group the input data into blocks in round robin fashion
+ val blockedInputNumberElements = FlinkTools.block(
+ input,
+ blocks,
+ Some(ModuloKeyPartitioner)).
+ cross(numberVectors).
+ map { x => x }
+
+ val resultingWeights = initialWeights.iterate(iterations) {
+ weights => {
+ // compute the local SDCA to obtain the weight vector updates
+ val deltaWs = localDualMethod(
+ weights,
+ blockedInputNumberElements,
+ localIterations,
+ regularization,
+ scaling,
+ seed
+ )
+
+ // scale the weight vectors
+ val weightedDeltaWs = deltaWs map {
+ deltaW => {
+ deltaW :*= scaling
+ }
+ }
+
+ // calculate the new weight vector by adding the weight vector updates to the weight
+ // vector value
+ weights.union(weightedDeltaWs).reduce { _ + _ }
+ }
+ }
+
+ // Store the learned weight vector in hte given instance
+ instance.weightsOption = Some(resultingWeights)
}
}
-
- CoCoAModel(resultingWeights)
}
/** Creates a zero vector of length dimension
@@ -270,13 +370,13 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
* @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
*/
private def localDualMethod(
- w: DataSet[BreezeDenseVector[Double]],
- blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
- localIterations: Int,
- regularization: Double,
- scaling: Double,
- seed: Long)
- : DataSet[BreezeDenseVector[Double]] = {
+ w: DataSet[BreezeDenseVector[Double]],
+ blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
+ localIterations: Int,
+ regularization: Double,
+ scaling: Double,
+ seed: Long)
+ : DataSet[BreezeDenseVector[Double]] = {
/*
Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
because we broadcast the current value of the weight vector to all mappers.
@@ -300,7 +400,7 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
}
override def map(blockNumberElements: (Block[LabeledVector], Int))
- : BreezeDenseVector[Double] = {
+ : BreezeDenseVector[Double] = {
val (block, numberElements) = blockNumberElements
// check if we already processed a data block with the corresponding block index
@@ -406,87 +506,5 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
(0.0 , BreezeVector.zeros(w.length))
}
}
-}
-
-/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
- * of the algorithm.
- */
-object CoCoA{
- val WEIGHT_VECTOR ="weightVector"
-
- case object Blocks extends Parameter[Int] {
- val defaultValue: Option[Int] = None
- }
-
- case object Iterations extends Parameter[Int] {
- val defaultValue = Some(10)
- }
-
- case object LocalIterations extends Parameter[Int] {
- val defaultValue = Some(10)
- }
-
- case object Regularization extends Parameter[Double] {
- val defaultValue = Some(1.0)
- }
-
- case object Stepsize extends Parameter[Double] {
- val defaultValue = Some(1.0)
- }
-
- case object Seed extends Parameter[Long] {
- val defaultValue = Some(0L)
- }
-
- def apply(): CoCoA = {
- new CoCoA()
- }
-}
-
-/** Resulting SVM model calculated by the CoCoA algorithm.
- *
- * @param weights Calculated weight vector representing the separating hyperplane of the
- * classification task.
- */
-case class CoCoAModel(weights: DataSet[BreezeDenseVector[Double]])
- extends Transformer[Vector, LabeledVector]
- with Serializable {
- import CoCoA.WEIGHT_VECTOR
-
- /** Calculates the prediction value of the SVM value (not the label)
- *
- * @param input [[DataSet]] containing the vector for which to calculate the predictions
- * @param parameters Parameter values for the algorithm
- * @return [[DataSet]] containing the labeled vectors
- */
- override def transform(input: DataSet[Vector], parameters: ParameterMap):
- DataSet[LabeledVector] = {
- input.map(new PredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
- }
-}
-
-/** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
- * we broadcast the weight vector to all mappers.
- */
-class PredictionMapper extends RichMapFunction[Vector, LabeledVector] {
-
- import CoCoA.WEIGHT_VECTOR
-
- var weights: BreezeDenseVector[Double] = _
-
- @throws(classOf[Exception])
- override def open(configuration: Configuration): Unit = {
- // get current weights
- weights = getRuntimeContext.
- getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
- }
-
- override def map(vector: Vector): LabeledVector = {
- // calculate the prediction value (scaled distance from the separating hyperplane)
- val dotProduct = weights dot vector.asBreeze
- LabeledVector(dotProduct, vector)
- }
}
-
-
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
deleted file mode 100644
index cf1b51a..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** This class represents a [[org.apache.flink.ml.common.Learner]] which is chained to a
- * [[Transformer]].
- *
- * Calling the method `fit` on this object will pipe the input data through the given
- * [[Transformer]], whose output is fed to the [[Learner]].
- *
- * @param head Preceding [[Transformer]] pipeline
- * @param tail [[Learner]] instance
- * @tparam IN Type of the training data
- * @tparam TEMP Type of the produced data by the transformer pipeline and input type to the
- * [[Learner]]
- * @tparam OUT Type of the trained model
- */
-class ChainedLearner[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
- val tail: Learner[TEMP, OUT])
- extends Learner[IN, OUT] {
-
- override def fit(input: DataSet[IN], fitParameters: ParameterMap): OUT = {
- val tempResult = head.transform(input, fitParameters)
-
- tail.fit(tempResult, fitParameters)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
deleted file mode 100644
index 0658876..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** This class represents a chain of multiple [[Transformer]].
- *
- * Calling the method `transform` on this object will first apply the preceding [[Transformer]] to
- * the input data. The resulting output data is then fed to the succeeding [[Transformer]].
- *
- * @param head Preceding [[Transformer]]
- * @param tail Succeeding [[Transformer]]
- * @tparam IN Type of incoming elements
- * @tparam TEMP Type of output elements of the preceding [[Transformer]] and input type of
- * succeeding [[Transformer]]
- * @tparam OUT Type of outgoing elements
- */
-class ChainedTransformer[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
- val tail: Transformer[TEMP, OUT])
- extends Transformer[IN, OUT] {
-
- override def transform(input: DataSet[IN], transformParameters: ParameterMap): DataSet[OUT] = {
- val tempResult = head.transform(input, transformParameters)
- tail.transform(tempResult)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
deleted file mode 100644
index a081f76..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** Base trait for an algorithm which trains a model based on some training data
- *
- * The idea is that all algorithms which train a model implement this trait. That way
- * they can be chained with [[Transformer]] which act as a preprocessing step for the actual
- * learning. In that sense, [[Learner]] denote the end of a pipeline and cannot be further
- * chained.
- *
- * Every learner has to implement the `fit` method which takes the training data and learns
- * a model from the data.
- *
- * @tparam IN Type of the training data
- * @tparam OUT Type of the trained model
- */
-trait Learner[IN, OUT] extends WithParameters {
- def fit(input: DataSet[IN], fitParameters: ParameterMap = ParameterMap.Empty): OUT
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
deleted file mode 100644
index 6b8780d..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** Base trait for an algorithm which transforms the input data to some output data.
- *
- * A [[Transformer]] is used to transform input data to some output data. Transformations might
- * be feature extractors, feature mappings, whitening or centralization just to name a few.
- *
- * [[Transformer]] can be chained with other [[Transformer]] creating a [[ChainedTransformer]],
- * which again can be chained. Chaining a [[Transformer]] with a [[Learner]] creates a
- * [[ChainedLearner]] which terminates a pipeline.
- *
- * A [[Transformer]] implementation has to implement the method `transform`, which defines how
- * the input data is transformed into the output data.
- *
- * @tparam IN Type of incoming elements
- * @tparam OUT Type of outgoing elements
- */
-trait Transformer[IN, OUT] extends WithParameters {
- def chain[CHAINED](transformer: Transformer[OUT, CHAINED]):
- ChainedTransformer[IN, OUT, CHAINED] = {
- new ChainedTransformer[IN, OUT, CHAINED](this, transformer)
- }
-
- def chain[CHAINED](learner: Learner[OUT, CHAINED]): ChainedLearner[IN, OUT, CHAINED] = {
- new ChainedLearner[IN, OUT, CHAINED](this, learner)
- }
-
- def transform(input: DataSet[IN], transformParameters: ParameterMap = ParameterMap.Empty):
- DataSet[OUT]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
deleted file mode 100644
index 2f36e8c..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P)
- extends Predictor[ChainedPredictor[T, P]]{}
-
-object ChainedPredictor{
- implicit def chainedPredictOperation[
- T <: Transformer[T],
- P <: Predictor[P],
- Input,
- Testing,
- Prediction](
- implicit transform: TransformOperation[T, Input, Testing],
- predictor: PredictOperation[P, Testing, Prediction])
- : PredictOperation[ChainedPredictor[T, P], Input, Prediction] = {
-
- new PredictOperation[ChainedPredictor[T, P], Input, Prediction] {
- override def predict(
- instance: ChainedPredictor[T, P],
- predictParameters: ParameterMap,
- input: DataSet[Input])
- : DataSet[Prediction] = {
-
- val testing = instance.transformer.transform(input, predictParameters)
- instance.predictor.predict(testing, predictParameters)
- }
- }
- }
-
- implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit
- leftFitOperation: FitOperation[L, I],
- leftTransformOperation: TransformOperation[L, I, T],
- rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = {
- new FitOperation[ChainedTransformer[L, R], I] {
- override def fit(
- instance: ChainedTransformer[L, R],
- fitParameters: ParameterMap,
- input: DataSet[I])
- : Unit = {
- instance.left.fit(input, fitParameters)
- val intermediateResult = instance.left.transform(input, fitParameters)
- instance.right.fit(intermediateResult, fitParameters)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
deleted file mode 100644
index dc9c611..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L, right: R)
- extends Transformer[ChainedTransformer[L, R]] {
-}
-
-object ChainedTransformer{
- implicit def chainedTransformOperation[
- L <: Transformer[L],
- R <: Transformer[R],
- I,
- T,
- O](implicit
- transformLeft: TransformOperation[L, I, T],
- transformRight: TransformOperation[R, T, O])
- : TransformOperation[ChainedTransformer[L,R], I, O] = {
-
- new TransformOperation[ChainedTransformer[L, R], I, O] {
- override def transform(
- chain: ChainedTransformer[L, R],
- transformParameters: ParameterMap,
- input: DataSet[I]): DataSet[O] = {
- val intermediateResult = transformLeft.transform(chain.left, transformParameters, input)
- transformRight.transform(chain.right, transformParameters, intermediateResult)
- }
- }
- }
-
- implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit
- leftFitOperation: FitOperation[L, I],
- leftTransformOperation: TransformOperation[L, I, T],
- rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = {
- new FitOperation[ChainedTransformer[L, R], I] {
- override def fit(
- instance: ChainedTransformer[L, R],
- fitParameters: ParameterMap,
- input: DataSet[I]): Unit = {
- instance.left.fit(input, fitParameters)
- val intermediateResult = instance.left.transform(input, fitParameters)
- instance.right.fit(intermediateResult, fitParameters)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
deleted file mode 100644
index e0c81a4..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{ParameterMap, WithParameters}
-
-trait Estimator[Self] extends WithParameters with Serializable {
- that: Self =>
-
- def fit[Training](
- input: DataSet[Training],
- fitParameters: ParameterMap = ParameterMap.Empty)(implicit
- fitOperation: FitOperation[Self, Training]): Unit = {
- fitOperation.fit(this, fitParameters, input)
- }
-}
-
-object Estimator{
- implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag]
- : FitOperation[Self, Training] = {
- new FitOperation[Self, Training]{
- override def fit(
- instance: Self,
- fitParameters: ParameterMap,
- input: DataSet[Training])
- : Unit = {
- new FitOperation[Self, Training] {
- override def fit(
- instance: Self,
- fitParameters: ParameterMap,
- input: DataSet[Training])
- : Unit = {
- val self = implicitly[ClassTag[Self]]
- val training = implicitly[ClassTag[Training]]
-
- throw new RuntimeException("There is no FitOperation defined for " + self.runtimeClass +
- " which trains on a DataSet[" + training.runtimeClass + "]")
- }
- }
- }
- }
- }
-
- implicit def fallbackChainedFitOperationTransformer[
- L <: Transformer[L],
- R <: Transformer[R],
- LI,
- LO,
- RI](implicit
- leftFitOperation: FitOperation[L, LI],
- leftTransformOperation: TransformOperation[L, LI, LO],
- rightFitOperaiton: FitOperation[R, RI])
- : FitOperation[ChainedTransformer[L, R], LI] = {
- new FitOperation[ChainedTransformer[L, R], LI] {
- override def fit(
- instance: ChainedTransformer[L, R],
- fitParameters: ParameterMap,
- input: DataSet[LI]): Unit = {
- instance.left.fit(input, fitParameters)
- instance.left.transform(input, fitParameters)
- instance.right.fit(null, fitParameters)
- }
- }
- }
-
- implicit def fallbackChainedFitOperationPredictor[
- L <: Transformer[L],
- R <: Predictor[R],
- LI,
- LO,
- RI](implicit
- leftFitOperation: FitOperation[L, LI],
- leftTransformOperation: TransformOperation[L, LI, LO],
- rightFitOperaiton: FitOperation[R, RI])
- : FitOperation[ChainedPredictor[L, R], LI] = {
- new FitOperation[ChainedPredictor[L, R], LI] {
- override def fit(
- instance: ChainedPredictor[L, R],
- fitParameters: ParameterMap,
- input: DataSet[LI]): Unit = {
- instance.transformer.fit(input, fitParameters)
- instance.transformer.transform(input, fitParameters)
- instance.predictor.fit(null, fitParameters)
- }
- }
- }
-}
-
-trait FitOperation[Self, Training]{
- def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Unit
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
deleted file mode 100644
index 5acd34f..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common.{ParameterMap, LabeledVector}
-import org.apache.flink.ml.math._
-
-class KMeans extends Predictor[KMeans] {
-}
-
-object KMeans{
-
- implicit val kMeansEstimator = new FitOperation[KMeans, LabeledVector] {
- override def fit(
- instance: KMeans,
- parameters: ParameterMap,
- input: DataSet[LabeledVector]): Unit = {
- input.print
- }
- }
-
- implicit def kMeansPredictor[V <: Vector]
- = new PredictOperation[KMeans, V, LabeledVector] {
- override def predict(
- instance: KMeans,
- parameters: ParameterMap,
- input: DataSet[V]): DataSet[LabeledVector] = {
- input.map{
- vector => LabeledVector(1.0, vector)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
deleted file mode 100644
index c9d082f..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.math._
-
-class Offset extends Transformer[Offset] {
-}
-
-object Offset{
- import Breeze._
-
- implicit def offsetTransform[I <: Vector : CanCopy: ClassTag: TypeInformation]
- = new TransformOperation[Offset, I, I] {
- override def transform(
- offset: Offset,
- parameters: ParameterMap,
- input: DataSet[I]): DataSet[I] = {
- input.map{
- vector =>
- val brz = copy(vector).asBreeze
-
- val result = brz + 1.0
-
- result.fromBreeze.asInstanceOf[I]
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
deleted file mode 100644
index 8ec6665..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{ParameterMap, WithParameters}
-
-trait Predictor[Self] extends Estimator[Self] with WithParameters with Serializable {
- that: Self =>
-
- def predict[Testing, Prediction](
- input: DataSet[Testing],
- predictParameters: ParameterMap = ParameterMap.Empty)(implicit
- predictor: PredictOperation[Self, Testing, Prediction])
- : DataSet[Prediction] = {
- predictor.predict(this, predictParameters, input)
- }
-}
-
-object Predictor{
- implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag, Prediction: ClassTag]
- : PredictOperation[Self, Testing, Prediction] = {
- new PredictOperation[Self, Testing, Prediction] {
- override def predict(
- instance: Self,
- predictParameters: ParameterMap,
- input: DataSet[Testing])
- : DataSet[Prediction] = {
- val self = implicitly[ClassTag[Self]]
- val testing = implicitly[ClassTag[Testing]]
- val prediction = implicitly[ClassTag[Prediction]]
-
- throw new RuntimeException("There is no PredictOperation defined for " + self.runtimeClass +
- " which takes a DataSet[" + testing.runtimeClass + "] as input and returns a DataSet[" +
- prediction.runtimeClass + "]")
- }
- }
- }
-
- implicit def fallbackChainedPredictOperation[
- L <: Transformer[L],
- R <: Predictor[R],
- LI,
- LO,
- RI,
- RO](implicit
- leftTransformOperation: TransformOperation[L, LI, LO],
- rightPredictOperation: PredictOperation[R, RI, RO]
- )
- : PredictOperation[ChainedPredictor[L, R], LI, RO] = {
- new PredictOperation[ChainedPredictor[L, R], LI, RO] {
- override def predict(
- instance: ChainedPredictor[L, R],
- predictParameters: ParameterMap,
- input: DataSet[LI]): DataSet[RO] = {
- instance.transformer.transform(input, predictParameters)
- instance.predictor.predict(null, predictParameters)
- }
- }
- }
-}
-
-abstract class PredictOperation[Self, Testing, Prediction]{
- def predict(
- instance: Self,
- predictParameters: ParameterMap,
- input: DataSet[Testing])
- : DataSet[Prediction]
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
deleted file mode 100644
index a68c5d3..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.math._
-
-class Scaler extends Transformer[Scaler] {
- var meanValue = 0.0
-}
-
-object Scaler{
- import Breeze._
-
- implicit def vTransform[T <: Vector : CanCopy: ClassTag: TypeInformation]
- = new TransformOperation[Scaler, T, T] {
- override def transform(
- instance: Scaler,
- parameters: ParameterMap,
- input: DataSet[T]): DataSet[T] = {
- input.map{
- vector =>
- val breezeVector = copy(vector).asBreeze
- instance.meanValue = instance.meanValue + breeze.stats.mean(breezeVector)
-
- breezeVector :/= instance.meanValue
-
- breezeVector.fromBreeze.asInstanceOf[T]
- }
- }
- }
-}