You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/05/22 10:43:39 UTC

[7/7] flink git commit: [FLINK-2050] [ml] Ports existing ML algorithms to new pipeline mechanism

[FLINK-2050] [ml] Ports existing ML algorithms to new pipeline mechanism

Adds pipeline comments

Adds pipeline IT case


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e574750
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e574750
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e574750

Branch: refs/heads/master
Commit: 1e574750da55613582aacab5fbdf1a6730d95b96
Parents: fde0341
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed May 20 13:49:52 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri May 22 10:42:51 2015 +0200

----------------------------------------------------------------------
 docs/libs/ml/als.md                             |   4 +-
 docs/libs/ml/cocoa.md                           |   6 +-
 docs/libs/ml/multiple_linear_regression.md      |   4 +-
 docs/libs/ml/polynomial_base_feature_mapper.md  |   4 +-
 docs/libs/ml/standard_scaler.md                 |   3 +
 .../src/test/assembly/test-assembly.xml         |   2 +-
 .../flink/configuration/ConfigurationTest.java  |   2 +-
 flink-dist/src/main/assemblies/bin.xml          |   2 +-
 flink-examples/flink-java-examples/pom.xml      |   4 +-
 .../examples/java/relational/TPCHQuery3.java    |   2 +-
 flink-examples/flink-scala-examples/pom.xml     |   6 +-
 .../aggregation/AvgAggregationFunction.java     |   2 +-
 .../flink/api/java/tuple/TupleGenerator.java    |   8 +-
 flink-java8/pom.xml                             |   2 +-
 .../optimizer/traversals/package-info.java      |   2 +-
 .../main/resources/archetype-resources/pom.xml  |   8 +-
 .../archetype-resources/src/main/java/Job.java  |   4 +-
 .../main/resources/archetype-resources/pom.xml  |   8 +-
 .../src/main/scala/Job.scala                    |   4 +-
 .../src/main/java/LocalJob.java                 |   4 +-
 .../src/main/java/YarnJob.java                  |   4 +-
 .../messages/checkpoint/package-info.java       |   2 +-
 .../flink/runtime/messages/package-info.java    |   2 +-
 .../flink/runtime/util/JarFileCreator.java      |   4 +-
 .../flink/runtime/util/JarFileCreatorTest.java  |   2 +-
 .../org/apache/flink/api/scala/DataSet.scala    |   2 +-
 .../src/test/assembly/test-assembly.xml         |   2 +-
 .../datatypes/HadoopFileOutputCommitter.java    |   2 +-
 .../api/java/python/PythonPlanBinder.java       |   4 +-
 flink-staging/flink-ml/pom.xml                  |   4 +-
 .../apache/flink/ml/classification/CoCoA.scala  | 326 +++++-----
 .../apache/flink/ml/common/ChainedLearner.scala |  45 --
 .../flink/ml/common/ChainedTransformer.scala    |  43 --
 .../org/apache/flink/ml/common/Learner.scala    |  38 --
 .../apache/flink/ml/common/Transformer.scala    |  50 --
 .../ml/experimental/ChainedPredictor.scala      |  67 --
 .../ml/experimental/ChainedTransformer.scala    |  65 --
 .../flink/ml/experimental/Estimator.scala       | 110 ----
 .../apache/flink/ml/experimental/KMeans.scala   |  50 --
 .../apache/flink/ml/experimental/Offset.scala   |  50 --
 .../flink/ml/experimental/Predictor.scala       |  87 ---
 .../apache/flink/ml/experimental/Scaler.scala   |  52 --
 .../flink/ml/experimental/Transformer.scala     |  94 ---
 .../flink/ml/feature/PolynomialBase.scala       | 148 -----
 .../scala/org/apache/flink/ml/math/Breeze.scala |  10 +-
 .../flink/ml/math/BreezeVectorConverter.scala   |  75 +++
 .../org/apache/flink/ml/math/CanCopy.scala      |  23 -
 .../org/apache/flink/ml/math/DenseVector.scala  |   4 -
 .../scala/org/apache/flink/ml/math/Vector.scala |   6 -
 .../apache/flink/ml/math/VectorBuilder.scala    |  57 ++
 .../org/apache/flink/ml/math/package.scala      |   2 -
 .../flink/ml/pipeline/ChainedPredictor.scala    | 115 ++++
 .../flink/ml/pipeline/ChainedTransformer.scala  | 107 +++
 .../apache/flink/ml/pipeline/Estimator.scala    | 175 +++++
 .../apache/flink/ml/pipeline/Predictor.scala    | 149 +++++
 .../apache/flink/ml/pipeline/Transformer.scala  | 180 ++++++
 .../ml/preprocessing/PolynomialFeatures.scala   | 209 ++++++
 .../flink/ml/preprocessing/StandardScaler.scala | 200 ++++--
 .../apache/flink/ml/recommendation/ALS.scala    | 647 ++++++++++---------
 .../regression/MultipleLinearRegression.scala   | 374 ++++++-----
 .../src/test/resources/log4j-test.properties    |  33 +-
 .../flink/ml/classification/CoCoAITSuite.scala  |  52 ++
 .../flink/ml/classification/CoCoASuite.scala    |  52 --
 .../ml/experimental/SciKitPipelineSuite.scala   |  70 --
 .../ml/feature/PolynomialBaseITSuite.scala      |   9 +-
 .../flink/ml/pipeline/PipelineITSuite.scala     | 146 +++++
 .../preprocessing/StandardScalerITSuite.scala   |  10 +-
 .../flink/ml/recommendation/ALSITSuite.scala    |   6 +-
 .../MultipleLinearRegressionITSuite.scala       |  24 +-
 .../flink-streaming-examples/pom.xml            |   2 +-
 .../apache/flink/api/table/package-info.java    |   2 +-
 .../apache/flink/api/scala/table/package.scala  |   2 +-
 .../flink/api/table/expressions/package.scala   |   2 +-
 .../org/apache/flink/api/table/package.scala    |   2 +-
 .../apache/flink/api/table/plan/package.scala   |   2 +-
 .../flink/api/table/runtime/package.scala       |   2 +-
 .../tachyon/TachyonFileSystemWrapperTest.java   |   2 +-
 .../apache/flink/tez/examples/TPCHQuery3.java   |   2 +-
 .../test/assembly/test-custominput-assembly.xml |   2 +-
 .../src/test/assembly/test-kmeans-assembly.xml  |   2 +-
 .../test-streamingclassloader-assembly.xml      |   2 +-
 .../test/recordJobTests/TPCHQuery10ITCase.java  |   2 +-
 .../test/recordJobTests/TPCHQuery3ITCase.java   |   2 +-
 .../TPCHQuery3WithUnionITCase.java              |   2 +-
 .../test/recordJobTests/TPCHQuery4ITCase.java   |   2 +-
 .../test/recordJobTests/TPCHQuery9ITCase.java   |   2 +-
 flink-yarn-tests/pom.xml                        |   2 +-
 .../org/apache/flink/yarn/YarnTestBase.java     |   2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |   2 +-
 89 files changed, 2266 insertions(+), 1843 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/als.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/als.md b/docs/libs/ml/als.md
index 951fc3c..f827e4f 100644
--- a/docs/libs/ml/als.md
+++ b/docs/libs/ml/als.md
@@ -147,11 +147,11 @@ val parameters = ParameterMap()
 .add(ALS.Seed, 42l)
 
 // Calculate the factorization
-val factorization = als.fit(inputDS, parameters)
+als.fit(inputDS, parameters)
 
 // Read the testing data set from a csv file
 val testingDS: DataSet[(Int, Int)] = env.readCsvFile[(Int, Int)](pathToData)
 
 // Calculate the ratings according to the matrix factorization
-val predictedRatings = factorization.transform(testingDS)
+val predictedRatings = als.predict(testingDS)
 {% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/cocoa.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/cocoa.md b/docs/libs/ml/cocoa.md
index 0327f8f..cd13806 100644
--- a/docs/libs/ml/cocoa.md
+++ b/docs/libs/ml/cocoa.md
@@ -146,7 +146,7 @@ The CoCoA implementation can be controlled by the following parameters:
 val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
 
 // Create the CoCoA learner
-val cocoa = CoCoA()
+val svm = CoCoA()
 .setBlocks(10)
 .setIterations(10)
 .setLocalIterations(10)
@@ -154,11 +154,11 @@ val cocoa = CoCoA()
 .setStepsize(0.5)
 
 // Learn the SVM model
-val svm = cocoa.fit(trainingDS)
+svm.fit(trainingDS)
 
 // Read the testing data set
 val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
 
 // Calculate the predictions for the testing data set
-val predictionDS: DataSet[LabeledVector] = model.transform(testingDS)
+val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
 {% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/multiple_linear_regression.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/multiple_linear_regression.md b/docs/libs/ml/multiple_linear_regression.md
index 840e899..c11425d 100644
--- a/docs/libs/ml/multiple_linear_regression.md
+++ b/docs/libs/ml/multiple_linear_regression.md
@@ -117,8 +117,8 @@ val trainingDS: DataSet[LabeledVector] = ...
 val testingDS: DataSet[Vector] = ...
 
 // Fit the linear model to the provided data
-val model = mlr.fit(trainingDS)
+mlr.fit(trainingDS)
 
 // Calculate the predictions for the test data
-val predictions = model.transform(testingDS)
+val predictions = mlr.predict(testingDS)
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/polynomial_base_feature_mapper.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/polynomial_base_feature_mapper.md b/docs/libs/ml/polynomial_base_feature_mapper.md
index 2964f04..801e3cf 100644
--- a/docs/libs/ml/polynomial_base_feature_mapper.md
+++ b/docs/libs/ml/polynomial_base_feature_mapper.md
@@ -84,8 +84,8 @@ val parameters = ParameterMap()
 .add(MultipleLinearRegression.Stepsize, 0.5)
 
 // Create pipeline PolynomialBase -> MultipleLinearRegression
-val chained = polyBase.chain(mlr)
+val pipeline = polyBase.chainPredictor(mlr)
 
 // Learn the model
-val model = chained.fit(trainingDS)
+pipeline.fit(trainingDS)
 {% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/docs/libs/ml/standard_scaler.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/standard_scaler.md b/docs/libs/ml/standard_scaler.md
index aae4620..2ea21d6 100644
--- a/docs/libs/ml/standard_scaler.md
+++ b/docs/libs/ml/standard_scaler.md
@@ -85,6 +85,9 @@ val scaler = StandardScaler()
 // Obtain data set to be scaled
 val dataSet: DataSet[Vector] = ...
 
+// Learn the mean and standard deviation of the training data
+scaler.fit(dataSet)
+
 // Scale the provided data set to have mean=10.0 and std=2.0
 val scaledDS = scaler.transform(dataSet)
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-clients/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/assembly/test-assembly.xml b/flink-clients/src/test/assembly/test-assembly.xml
index 60b27d2..aa7b7d1 100644
--- a/flink-clients/src/test/assembly/test-assembly.xml
+++ b/flink-clients/src/test/assembly/test-assembly.xml
@@ -27,7 +27,7 @@ under the License.
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
 			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your pipeline(s) -->
+			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/client/testjar/**</include>
 			</includes>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index f7039c7..e131892 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 /**
- * This class contains test for the configuration pipeline. In particular, the serialization of {@link Configuration}
+ * This class contains test for the configuration package. In particular, the serialization of {@link Configuration}
  * objects is tested.
  */
 public class ConfigurationTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index d9dacf3..6a429ee 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -130,7 +130,7 @@ under the License.
 			</excludes>
 		</fileSet>
 		<fileSet>
-			<!-- copy python pipeline -->
+			<!-- copy python package -->
 			<directory>../flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python</directory>
 			<outputDirectory>resources/python/</outputDirectory>
 			<fileMode>0755</fileMode>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-java-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/pom.xml b/flink-examples/flink-java-examples/pom.xml
index 2a335b5..a7964cb 100644
--- a/flink-examples/flink-java-examples/pom.xml
+++ b/flink-examples/flink-java-examples/pom.xml
@@ -205,7 +205,7 @@ under the License.
 					<!--
 					<execution>
 						<id>TPCHQuery10</id>
-						<phase>pipeline</phase>
+						<phase>package</phase>
 						<goals>
 							<goal>jar</goal>
 						</goals>
@@ -228,7 +228,7 @@ under the License.
 					<!--
 					<execution>
 						<id>TPCHQuery3</id>
-						<phase>pipeline</phase>
+						<phase>package</phase>
 						<goals>
 							<goal>jar</goal>
 						</goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
index e66493a..9a6e58c 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery3.java
@@ -135,7 +135,7 @@ public class TPCHQuery3 {
 									}
 								});
 
-		// Join customers with orders and pipeline them into a ShippingPriorityItem
+		// Join customers with orders and package them into a ShippingPriorityItem
 		DataSet<ShippingPriorityItem> customerWithOrders = 
 				customers.join(orders).where(0).equalTo(1)
 							.with(

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-examples/flink-scala-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml
index 14fa874..5127c48 100644
--- a/flink-examples/flink-scala-examples/pom.xml
+++ b/flink-examples/flink-scala-examples/pom.xml
@@ -170,7 +170,7 @@ under the License.
 				</configuration>
 			</plugin>
 			
-			<!-- get default data from flink-java-examples pipeline -->
+			<!-- get default data from flink-java-examples package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
@@ -357,7 +357,7 @@ under the License.
 					<!--
 					<execution>
 						<id>TPCHQuery10</id>
-						<phase>pipeline</phase>
+						<phase>package</phase>
 						<goals>
 							<goal>jar</goal>
 						</goals>
@@ -380,7 +380,7 @@ under the License.
 					<!--
 					<execution>
 						<id>TPCHQuery3</id>
-						<phase>pipeline</phase>
+						<phase>package</phase>
 						<goals>
 							<goal>jar</goal>
 						</goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
index 1472cd9..b433d66 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/aggregation/AvgAggregationFunction.java
@@ -17,7 +17,7 @@ package org.apache.flink.api.java.aggregation;
  * limitations under the License.
  */
 
-//pipeline org.apache.flink.api.java.aggregation;
+//package org.apache.flink.api.java.aggregation;
 //
 //
 //public abstract class AvgAggregationFunction<T> extends AggregationFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
index 2149180..03826fc 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/tuple/TupleGenerator.java
@@ -524,8 +524,8 @@ class TupleGenerator {
 		// head
 		w.print(HEADER);
 
-		// pipeline and imports
-		w.println("pipeline " + PACKAGE + ';');
+		// package and imports
+		w.println("package " + PACKAGE + ';');
 		w.println();
 		w.println("import org.apache.flink.util.StringUtils;");
 		w.println();
@@ -780,8 +780,8 @@ class TupleGenerator {
 		// head
 		w.print(HEADER);
 
-		// pipeline and imports
-		w.println("pipeline " + PACKAGE + "." + BUILDER_SUFFIX + ';');
+		// package and imports
+		w.println("package " + PACKAGE + "." + BUILDER_SUFFIX + ';');
 		w.println();
 		w.println("import java.util.LinkedList;");
 		w.println("import java.util.List;");

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-java8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-java8/pom.xml b/flink-java8/pom.xml
index 5293221..0d82ea2 100644
--- a/flink-java8/pom.xml
+++ b/flink-java8/pom.xml
@@ -101,7 +101,7 @@ under the License.
 				</configuration>
 			</plugin>
 
-			<!-- get default data from flink-java-examples pipeline -->
+			<!-- get default data from flink-java-examples package -->
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
index d125475..cd8766c 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
@@ -17,7 +17,7 @@
  */
 
 /**
- * This pipeline contains the various traversals over the program plan and the
+ * This package contains the various traversals over the program plan and the
  * optimizer DAG (directed acyclic graph) that are made in the course of
  * the optimization.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index b1e3fac..30f2315 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -48,7 +48,7 @@ under the License.
 	
 	<!-- 
 		
-		Execute "mvn clean pipeline -Pbuild-jar"
+		Execute "mvn clean package -Pbuild-jar"
 		to build a jar file out of this project!
 
 		How to use the Flink Quickstart pom:
@@ -61,11 +61,11 @@ under the License.
 		b) Build a jar for running on the cluster:
 			There are two options for creating a jar from this project
 
-			b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
+			b.1) "mvn clean package" -> this will create a fat jar which contains all
 					dependencies necessary for running the jar created by this pom in a cluster.
 					The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
 
-			b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
+			b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
 					nicer dependency exclusion handling. This approach is preferred and leads to
 					much cleaner jar files.
 	-->
@@ -98,7 +98,7 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<version>2.3</version>
 				<executions>
-					<!-- Run shade goal on pipeline phase -->
+					<!-- Run shade goal on package phase -->
 					<execution>
 						<phase>package</phase>
 						<goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
index 6e813a4..603fc80 100644
--- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
+++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/src/main/java/Job.java
@@ -24,12 +24,12 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  * Skeleton for a Flink Job.
  *
  * For a full example of a Flink Job, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
  *
  * You can also generate a .jar file that you can submit on your Flink
  * cluster.
  * Just type
- * 		mvn clean pipeline
+ * 		mvn clean package
  * in the projects root directory.
  * You will find the jar in
  * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 299fa24..e940b90 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -49,7 +49,7 @@ under the License.
 
 	<!-- 
 		
-		Execute "mvn clean pipeline -Pbuild-jar"
+		Execute "mvn clean package -Pbuild-jar"
 		to build a jar file out of this project!
 
 		How to use the Flink Quickstart pom:
@@ -62,11 +62,11 @@ under the License.
 		b) Build a jar for running on the cluster:
 			There are two options for creating a jar from this project
 
-			b.1) "mvn clean pipeline" -> this will create a fat jar which contains all
+			b.1) "mvn clean package" -> this will create a fat jar which contains all
 					dependencies necessary for running the jar created by this pom in a cluster.
 					The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
 
-			b.2) "mvn clean pipeline -Pbuild-jar" -> This will also create a fat-jar, but with much
+			b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
 					nicer dependency exclusion handling. This approach is preferred and leads to
 					much cleaner jar files.
 	-->
@@ -102,7 +102,7 @@ under the License.
 				<artifactId>maven-shade-plugin</artifactId>
 				<version>2.3</version>
 				<executions>
-					<!-- Run shade goal on pipeline phase -->
+					<!-- Run shade goal on package phase -->
 					<execution>
 						<phase>package</phase>
 						<goals>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
index 44a7a03..3c34b0a 100644
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
+++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/Job.scala
@@ -24,12 +24,12 @@ import org.apache.flink.api.scala._
  * Skeleton for a Flink Job.
  *
  * For a full example of a Flink Job, see the WordCountJob.scala file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
  *
  * You can also generate a .jar file that you can submit on your Flink
  * cluster. Just type
  * {{{
- *   mvn clean pipeline
+ *   mvn clean package
  * }}}
  * in the projects root directory. You will find the jar in
  * target/flink-quickstart-0.1-SNAPSHOT-Sample.jar

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
index e324420..cf7474e 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/LocalJob.java
@@ -24,12 +24,12 @@ import org.apache.flink.tez.client.LocalTezEnvironment;
  * Skeleton for a Flink on Tez Job running using Tez local mode.
  *
  * For a full example of a Flink  on TezJob, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
  *
  * You can also generate a .jar file that you can submit on your Flink
  * cluster.
  * Just type
- * 		mvn clean pipeline
+ * 		mvn clean package
  * in the projects root directory.
  * You will find the jar in
  * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
----------------------------------------------------------------------
diff --git a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
index 1b0bbcf..51627d5 100644
--- a/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
+++ b/flink-quickstart/flink-tez-quickstart/src/main/resources/archetype-resources/src/main/java/YarnJob.java
@@ -24,12 +24,12 @@ import org.apache.flink.tez.client.RemoteTezEnvironment;
  * Skeleton for a Flink on Tez program running on Yarn.
  *
  * For a full example of a Flink on Tez program, see the WordCountJob.java file in the
- * same pipeline/directory or have a look at the website.
+ * same package/directory or have a look at the website.
  *
  * You can also generate a .jar file that you can submit on your Flink
  * cluster.
  * Just type
- * 		mvn clean pipeline
+ * 		mvn clean package
  * in the projects root directory.
  * You will find the jar in
  * 		target/flink-quickstart-0.1-SNAPSHOT-Sample.jar

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
index 7e422b2..7b96b81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/package-info.java
@@ -17,7 +17,7 @@
  */
 
 /**
- * This pipeline contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
+ * This package contains the messages that are sent between {@link org.apache.flink.runtime.jobmanager.JobManager}
  * and {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the checkpoint snapshots of the
  * distributed dataflow.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
index 78620a8..e0b8cce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/package-info.java
@@ -17,7 +17,7 @@
  */
 
 /**
- * This pipeline contains the messages that are sent between actors, like the
+ * This package contains the messages that are sent between actors, like the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} and
  * {@link org.apache.flink.runtime.taskmanager.TaskManager} to coordinate the distributed operations.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
index 9e39777..c55a9dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JarFileCreator.java
@@ -91,10 +91,10 @@ public class JarFileCreator {
 	}
 
 	/**
-	 * Manually specify the pipeline of the dependencies.
+	 * Manually specify the package of the dependencies.
 	 *
 	 * @param p
-	 * 		  the pipeline to be included.
+	 * 		  the package to be included.
 	 */
 	public synchronized JarFileCreator addPackage(String p) {
 		this.packages.add(p);

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
index 2e39068..ba207ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
@@ -199,7 +199,7 @@ public class JarFileCreatorTest {
 		ans.add("org/apache/flink/runtime/util/jartestprogram/WordCountWithInnerClass$Tokenizer.class");
 		ans.add("org/apache/flink/util/Collector.class");
 
-		Assert.assertTrue("Jar file for UDF pipeline is not correct", validate(ans, out));
+		Assert.assertTrue("Jar file for UDF package is not correct", validate(ans, out));
 
 		out.delete();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
index e146687..e283e95 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala
@@ -67,7 +67,7 @@ import scala.reflect.ClassTag
  *
  * A rich function can be used when more control is required, for example for accessing the
  * `RuntimeContext`. The rich function for `flatMap` is `RichFlatMapFunction`, all other functions
- * are named similarly. All functions are available in pipeline
+ * are named similarly. All functions are available in package
  * `org.apache.flink.api.common.functions`.
  *
  * The elements are partitioned depending on the parallelism of the

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
index 86563b0..0f4561a 100644
--- a/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
+++ b/flink-staging/flink-avro/src/test/assembly/test-assembly.xml
@@ -27,7 +27,7 @@ under the License.
 		<fileSet>
 			<directory>${project.build.testOutputDirectory}</directory>
 			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your pipeline(s) -->
+			<!--modify/add include to match your package(s) -->
 			<includes>
 				<include>org/apache/flink/api/avro/testjar/**</include>
 			</includes>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
index b9c28c5..ce4955c 100644
--- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
+++ b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils;
 
 /**
  * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
- * as input parameter. However JobContext class is pipeline private, and in Hadoop 2.2.0 it's public.
+ * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
  * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
  */
 public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
index 7d180b6..c278f5c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -125,14 +125,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	//=====Setup========================================================================================================
 	/**
 	 * Copies all files to a common directory (FLINK_PYTHON_FILE_PATH). This allows us to distribute it as one big
-	 * pipeline, and resolves PYTHONPATH issues.
+	 * package, and resolves PYTHONPATH issues.
 	 *
 	 * @param filePaths
 	 * @throws IOException
 	 * @throws URISyntaxException
 	 */
 	private void prepareFiles(String... filePaths) throws IOException, URISyntaxException {
-		//Flink python pipeline
+		//Flink python package
 		String tempFilePath = FLINK_PYTHON_FILE_PATH;
 		clearPath(tempFilePath);
 		FileCache.copy(new Path(FULL_PATH), new Path(tempFilePath), false);

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/pom.xml b/flink-staging/flink-ml/pom.xml
index bc61e2e..43e6605 100644
--- a/flink-staging/flink-ml/pom.xml
+++ b/flink-staging/flink-ml/pom.xml
@@ -117,7 +117,7 @@
 						</goals>
 						<configuration>
 							<suffixes>(?&lt;!(IT|Integration))(Test|Suite|Case)</suffixes>
-							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit</argLine>
+							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
 						</configuration>
 					</execution>
 					<execution>
@@ -128,7 +128,7 @@
 						</goals>
 						<configuration>
 							<suffixes>(IT|Integration)(Test|Suite|Case)</suffixes>
-							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -XX:-UseGCOverheadLimit</argLine>
+							<argLine>-Xms256m -Xmx800m -Dlog4j.configuration=${log4j.configuration} -Dmvn.forkNumber=${surefire.forkNumber} -XX:-UseGCOverheadLimit</argLine>
 						</configuration>
 					</execution>
 				</executions>

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
index e1c2053..4ba9299 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.ml.classification
 
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
@@ -66,60 +68,63 @@ import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
   *          {{{
   *             val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
   *
-  *             val cocoa = CoCoA()
+  *             val svm = CoCoA()
   *               .setBlocks(10)
   *               .setIterations(10)
   *               .setLocalIterations(10)
   *               .setRegularization(0.5)
   *               .setStepsize(0.5)
   *
-  *             val svm = cocoa.fit(trainingDS)
+  *             svm.fit(trainingDS)
   *
   *             val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
   *
-  *             val predictionDS: DataSet[LabeledVector] = model.transform(testingDS)
+  *             val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
   *          }}}
   *
   * =Parameters=
   *
-  *  - [[CoCoA.Blocks]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.Blocks]]:
   *  Sets the number of blocks into which the input data will be split. On each block the local
   *  stochastic dual coordinate ascent method is executed. This number should be set at least to
   *  the degree of parallelism. If no value is specified, then the parallelism of the input
   *  [[DataSet]] is used as the number of blocks. (Default value: '''None''')
   *
-  *  - [[CoCoA.Iterations]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.Iterations]]:
   *  Defines the maximum number of iterations of the outer loop method. In other words, it defines
   *  how often the SDCA method is applied to the blocked data. After each iteration, the locally
   *  computed weight vector updates have to be reduced to update the global weight vector value.
   *  The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
   *  (Default value: '''10''')
   *
-  *  - [[CoCoA.LocalIterations]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.LocalIterations]]:
   *  Defines the maximum number of SDCA iterations. In other words, it defines how many data points
   *  are drawn from each local data block to calculate the stochastic dual coordinate ascent.
   *  (Default value: '''10''')
   *
-  *  - [[CoCoA.Regularization]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.Regularization]]:
   *  Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
   *  will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
   *  SVM margin will be wider even though it might contain some false classifications.
   *  (Default value: '''1.0''')
   *
-  *  - [[CoCoA.Stepsize]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.Stepsize]]:
   *  Defines the initial step size for the updates of the weight vector. The larger the step size
   *  is, the larger will be the contribution of the weight vector updates to the next weight vector
   *  value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
   *  in case that the algorithm becomes instable. (Default value: '''1.0''')
   *
-  *  - [[CoCoA.Seed]]:
+  *  - [[org.apache.flink.ml.classification.CoCoA.Seed]]:
   *  Defines the seed to initialize the random number generator. The seed directly controls which
   *  data points are chosen for the SDCA method. (Default value: '''0''')
   */
-class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
+class CoCoA extends Predictor[CoCoA] {
 
   import CoCoA._
 
+  /** Stores the learned weight vector after the fit operation */
+  var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
+
   /** Sets the number of data blocks/partitions
     *
     * @param blocks
@@ -179,73 +184,168 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
     parameters.add(Seed, seed)
     this
   }
+}
 
-  /** Trains a SVM with soft-margin based on the given training data set.
+/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
+  * of the algorithm.
+  */
+object CoCoA{
+  val WEIGHT_VECTOR ="weightVector"
+
+  // ========================================== Parameters =========================================
+
+  case object Blocks extends Parameter[Int] {
+    val defaultValue: Option[Int] = None
+  }
+
+  case object Iterations extends Parameter[Int] {
+    val defaultValue = Some(10)
+  }
+
+  case object LocalIterations extends Parameter[Int] {
+    val defaultValue = Some(10)
+  }
+
+  case object Regularization extends Parameter[Double] {
+    val defaultValue = Some(1.0)
+  }
+
+  case object Stepsize extends Parameter[Double] {
+    val defaultValue = Some(1.0)
+  }
+
+  case object Seed extends Parameter[Long] {
+    val defaultValue = Some(0L)
+  }
+
+  // ========================================== Factory methods ====================================
+
+  def apply(): CoCoA = {
+    new CoCoA()
+  }
+
+  // ========================================== Operations =========================================
+
+  /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
+    * [[LabeledVector]]
     *
-    * @param input Training data set
-    * @param fitParameters Parameter values
-    * @return Trained SVM model
+    * @tparam T Subtype of [[Vector]]
+    * @return
+    */
+  implicit def predictValues[T <: Vector] = {
+    new PredictOperation[CoCoA, T, LabeledVector]{
+      override def predict(
+          instance: CoCoA,
+          predictParameters: ParameterMap,
+          input: DataSet[T])
+        : DataSet[LabeledVector] = {
+
+        instance.weightsOption match {
+          case Some(weights) => {
+            input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
+          }
+        }
+      }
+    }
+  }
+
+  /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
+    * we broadcast the weight vector to all mappers.
     */
-  override def fit(input: DataSet[LabeledVector], fitParameters: ParameterMap): CoCoAModel = {
-    val resultingParameters = this.parameters ++ fitParameters
+  class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
 
-    // Check if the number of blocks/partitions has been specified
-    val blocks = resultingParameters.get(Blocks) match {
-      case Some(value) => value
-      case None => input.getParallelism
+    var weights: BreezeDenseVector[Double] = _
+
+    @throws(classOf[Exception])
+    override def open(configuration: Configuration): Unit = {
+      // get current weights
+      weights = getRuntimeContext.
+        getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
     }
 
-    val scaling = resultingParameters(Stepsize)/blocks
-    val iterations = resultingParameters(Iterations)
-    val localIterations = resultingParameters(LocalIterations)
-    val regularization = resultingParameters(Regularization)
-    val seed = resultingParameters(Seed)
-
-    // Obtain DataSet with the dimension of the data points
-    val dimension = input.map{_.vector.size}.reduce{
-      (a, b) => {
-        require(a == b, "Dimensions of feature vectors have to be equal.")
-        a
-      }
+    override def map(vector: T): LabeledVector = {
+      // calculate the prediction value (scaled distance from the separating hyperplane)
+      val dotProduct = weights dot vector.asBreeze
+
+      LabeledVector(dotProduct, vector)
     }
+  }
 
-    val initialWeights = createInitialWeights(dimension)
-
-    // Count the number of vectors, but keep the value in a DataSet to broadcast it later
-    // TODO: Once efficient count and intermediate result partitions are implemented, use count
-    val numberVectors = input map { x => 1 } reduce { _ + _ }
-
-    // Group the input data into blocks in round robin fashion
-    val blockedInputNumberElements = FlinkTools.block(input, blocks, Some(ModuloKeyPartitioner)).
-      cross(numberVectors).
-      map { x => x }
-
-    val resultingWeights = initialWeights.iterate(iterations) {
-      weights => {
-        // compute the local SDCA to obtain the weight vector updates
-        val deltaWs = localDualMethod(
-          weights,
-          blockedInputNumberElements,
-          localIterations,
-          regularization,
-          scaling,
-          seed
-        )
-
-        // scale the weight vectors
-        val weightedDeltaWs = deltaWs map {
-          deltaW => {
-            deltaW :*= scaling
+  /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
+    *
+    */
+  implicit val fitCoCoA = {
+    new FitOperation[CoCoA, LabeledVector] {
+      override def fit(
+          instance: CoCoA,
+          fitParameters: ParameterMap,
+          input: DataSet[LabeledVector])
+        : Unit = {
+        val resultingParameters = instance.parameters ++ fitParameters
+
+        // Check if the number of blocks/partitions has been specified
+        val blocks = resultingParameters.get(Blocks) match {
+          case Some(value) => value
+          case None => input.getParallelism
+        }
+
+        val scaling = resultingParameters(Stepsize)/blocks
+        val iterations = resultingParameters(Iterations)
+        val localIterations = resultingParameters(LocalIterations)
+        val regularization = resultingParameters(Regularization)
+        val seed = resultingParameters(Seed)
+
+        // Obtain DataSet with the dimension of the data points
+        val dimension = input.map{_.vector.size}.reduce{
+          (a, b) => {
+            require(a == b, "Dimensions of feature vectors have to be equal.")
+            a
           }
         }
 
-        // calculate the new weight vector by adding the weight vector updates to the weight vector
-        // value
-        weights.union(weightedDeltaWs).reduce { _ + _ }
+        val initialWeights = createInitialWeights(dimension)
+
+        // Count the number of vectors, but keep the value in a DataSet to broadcast it later
+        // TODO: Once efficient count and intermediate result partitions are implemented, use count
+        val numberVectors = input map { x => 1 } reduce { _ + _ }
+
+        // Group the input data into blocks in round robin fashion
+        val blockedInputNumberElements = FlinkTools.block(
+          input,
+          blocks,
+          Some(ModuloKeyPartitioner)).
+          cross(numberVectors).
+          map { x => x }
+
+        val resultingWeights = initialWeights.iterate(iterations) {
+          weights => {
+            // compute the local SDCA to obtain the weight vector updates
+            val deltaWs = localDualMethod(
+              weights,
+              blockedInputNumberElements,
+              localIterations,
+              regularization,
+              scaling,
+              seed
+            )
+
+            // scale the weight vectors
+            val weightedDeltaWs = deltaWs map {
+              deltaW => {
+                deltaW :*= scaling
+              }
+            }
+
+            // calculate the new weight vector by adding the weight vector updates to the weight
+            // vector value
+            weights.union(weightedDeltaWs).reduce { _ + _ }
+          }
+        }
+
+        // Store the learned weight vector in hte given instance
+        instance.weightsOption = Some(resultingWeights)
       }
     }
-
-    CoCoAModel(resultingWeights)
   }
 
   /** Creates a zero vector of length dimension
@@ -270,13 +370,13 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
     * @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
     */
   private def localDualMethod(
-      w: DataSet[BreezeDenseVector[Double]],
-      blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
-      localIterations: Int,
-      regularization: Double,
-      scaling: Double,
-      seed: Long)
-    : DataSet[BreezeDenseVector[Double]] = {
+    w: DataSet[BreezeDenseVector[Double]],
+    blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
+    localIterations: Int,
+    regularization: Double,
+    scaling: Double,
+    seed: Long)
+  : DataSet[BreezeDenseVector[Double]] = {
     /*
     Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
     because we broadcast the current value of the weight vector to all mappers.
@@ -300,7 +400,7 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
       }
 
       override def map(blockNumberElements: (Block[LabeledVector], Int))
-        : BreezeDenseVector[Double] = {
+      : BreezeDenseVector[Double] = {
         val (block, numberElements) = blockNumberElements
 
         // check if we already processed a data block with the corresponding block index
@@ -406,87 +506,5 @@ class CoCoA extends Learner[LabeledVector, CoCoAModel] with Serializable {
       (0.0 , BreezeVector.zeros(w.length))
     }
   }
-}
-
-/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
-  * of the algorithm.
-  */
-object CoCoA{
-  val WEIGHT_VECTOR ="weightVector"
-
-  case object Blocks extends Parameter[Int] {
-    val defaultValue: Option[Int] = None
-  }
-
-  case object Iterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
-
-  case object LocalIterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
-
-  case object Regularization extends Parameter[Double] {
-    val defaultValue = Some(1.0)
-  }
-
-  case object Stepsize extends Parameter[Double] {
-    val defaultValue = Some(1.0)
-  }
-
-  case object Seed extends Parameter[Long] {
-    val defaultValue = Some(0L)
-  }
-
-  def apply(): CoCoA = {
-    new CoCoA()
-  }
-}
-
-/** Resulting SVM model calculated by the CoCoA algorithm.
-  *
-   * @param weights Calculated weight vector representing the separating hyperplane of the
-  *                classification task.
-  */
-case class CoCoAModel(weights: DataSet[BreezeDenseVector[Double]])
-  extends Transformer[Vector, LabeledVector]
-  with Serializable {
-  import CoCoA.WEIGHT_VECTOR
-
-  /** Calculates the prediction value of the SVM value (not the label)
-    *
-    * @param input [[DataSet]] containing the vector for which to calculate the predictions
-    * @param parameters Parameter values for the algorithm
-    * @return [[DataSet]] containing the labeled vectors
-    */
-  override def transform(input: DataSet[Vector], parameters: ParameterMap):
-  DataSet[LabeledVector] = {
-    input.map(new PredictionMapper).withBroadcastSet(weights, WEIGHT_VECTOR)
-  }
-}
-
-/** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
-  * we broadcast the weight vector to all mappers.
-  */
-class PredictionMapper extends RichMapFunction[Vector, LabeledVector] {
-
-  import CoCoA.WEIGHT_VECTOR
-
-  var weights: BreezeDenseVector[Double] = _
-
-  @throws(classOf[Exception])
-  override def open(configuration: Configuration): Unit = {
-    // get current weights
-    weights = getRuntimeContext.
-      getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
-  }
-
-  override def map(vector: Vector): LabeledVector = {
-    // calculate the prediction value (scaled distance from the separating hyperplane)
-    val dotProduct = weights dot vector.asBreeze
 
-    LabeledVector(dotProduct, vector)
-  }
 }
-
-

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
deleted file mode 100644
index cf1b51a..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedLearner.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** This class represents a [[org.apache.flink.ml.common.Learner]] which is chained to a
-  * [[Transformer]].
-  *
-  * Calling the method `fit` on this object will pipe the input data through the given
-  * [[Transformer]], whose output is fed to the [[Learner]].
-  *
-  * @param head Preceding [[Transformer]] pipeline
-  * @param tail [[Learner]] instance
-  * @tparam IN Type of the training data
-  * @tparam TEMP Type of the produced data by the transformer pipeline and input type to the
-  *              [[Learner]]
-  * @tparam OUT Type of the trained model
-  */
-class ChainedLearner[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
-                                    val tail: Learner[TEMP, OUT])
-  extends Learner[IN, OUT] {
-
-  override def fit(input: DataSet[IN], fitParameters: ParameterMap): OUT = {
-    val tempResult = head.transform(input, fitParameters)
-
-    tail.fit(tempResult, fitParameters)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
deleted file mode 100644
index 0658876..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/ChainedTransformer.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** This class represents a chain of multiple [[Transformer]].
-  *
-  * Calling the method `transform` on this object will first apply the preceding [[Transformer]] to
-  * the input data. The resulting output data is then fed to the succeeding [[Transformer]].
-  *
-  * @param head Preceding [[Transformer]]
-  * @param tail Succeeding [[Transformer]]
-  * @tparam IN Type of incoming elements
-  * @tparam TEMP Type of output elements of the preceding [[Transformer]] and input type of
-  *              succeeding [[Transformer]]
-  * @tparam OUT Type of outgoing elements
-  */
-class ChainedTransformer[IN, TEMP, OUT](val head: Transformer[IN, TEMP],
-                                        val tail: Transformer[TEMP, OUT])
-  extends Transformer[IN, OUT] {
-
-  override def transform(input: DataSet[IN], transformParameters: ParameterMap): DataSet[OUT] = {
-    val tempResult = head.transform(input, transformParameters)
-    tail.transform(tempResult)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
deleted file mode 100644
index a081f76..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Learner.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** Base trait for an algorithm which trains a model based on some training data
-  *
-  * The idea is that all algorithms which train a model implement this trait. That way
-  * they can be chained with [[Transformer]] which act as a preprocessing step for the actual
-  * learning. In that sense, [[Learner]] denote the end of a pipeline and cannot be further
-  * chained.
-  *
-  * Every learner has to implement the `fit` method which takes the training data and learns
-  * a model from the data.
-  *
-  * @tparam IN Type of the training data
-  * @tparam OUT Type of the trained model
-  */
-trait Learner[IN, OUT] extends WithParameters {
-  def fit(input: DataSet[IN], fitParameters: ParameterMap = ParameterMap.Empty): OUT
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
deleted file mode 100644
index 6b8780d..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/common/Transformer.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.common
-
-import org.apache.flink.api.scala.DataSet
-
-/** Base trait for an algorithm which transforms the input data to some output data.
-  *
-  * A [[Transformer]] is used to transform input data to some output data. Transformations might
-  * be feature extractors, feature mappings, whitening or centralization just to name a few.
-  *
-  * [[Transformer]] can be chained with other [[Transformer]] creating a [[ChainedTransformer]],
-  * which again can be chained. Chaining a [[Transformer]] with a [[Learner]] creates a
-  * [[ChainedLearner]] which terminates a pipeline.
-  *
-  * A [[Transformer]] implementation has to implement the method `transform`, which defines how
-  * the input data is transformed into the output data.
-  *
-  * @tparam IN Type of incoming elements
-  * @tparam OUT Type of outgoing elements
-  */
-trait Transformer[IN, OUT] extends WithParameters {
-  def chain[CHAINED](transformer: Transformer[OUT, CHAINED]):
-  ChainedTransformer[IN, OUT, CHAINED] = {
-    new ChainedTransformer[IN, OUT, CHAINED](this, transformer)
-  }
-
-  def chain[CHAINED](learner: Learner[OUT, CHAINED]): ChainedLearner[IN, OUT, CHAINED] = {
-    new ChainedLearner[IN, OUT, CHAINED](this, learner)
-  }
-
-  def transform(input: DataSet[IN], transformParameters: ParameterMap = ParameterMap.Empty):
-  DataSet[OUT]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
deleted file mode 100644
index 2f36e8c..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedPredictor.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P)
-  extends Predictor[ChainedPredictor[T, P]]{}
-
-object ChainedPredictor{
-  implicit def chainedPredictOperation[
-      T <: Transformer[T],
-      P <: Predictor[P],
-      Input,
-      Testing,
-      Prediction](
-      implicit transform: TransformOperation[T, Input, Testing],
-      predictor: PredictOperation[P, Testing, Prediction])
-    : PredictOperation[ChainedPredictor[T, P], Input, Prediction] = {
-
-    new PredictOperation[ChainedPredictor[T, P], Input, Prediction] {
-      override def predict(
-          instance: ChainedPredictor[T, P],
-          predictParameters: ParameterMap,
-          input: DataSet[Input])
-        : DataSet[Prediction] = {
-
-        val testing = instance.transformer.transform(input, predictParameters)
-        instance.predictor.predict(testing, predictParameters)
-      }
-    }
-  }
-
-  implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit
-    leftFitOperation: FitOperation[L, I],
-    leftTransformOperation: TransformOperation[L, I, T],
-    rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = {
-    new FitOperation[ChainedTransformer[L, R], I] {
-      override def fit(
-          instance: ChainedTransformer[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[I])
-        : Unit = {
-        instance.left.fit(input, fitParameters)
-        val intermediateResult = instance.left.transform(input, fitParameters)
-        instance.right.fit(intermediateResult, fitParameters)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
deleted file mode 100644
index dc9c611..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/ChainedTransformer.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L, right: R)
-  extends Transformer[ChainedTransformer[L, R]] {
-}
-
-object ChainedTransformer{
-  implicit def chainedTransformOperation[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      I,
-      T,
-      O](implicit
-      transformLeft: TransformOperation[L, I, T],
-      transformRight: TransformOperation[R, T, O])
-    : TransformOperation[ChainedTransformer[L,R], I, O] = {
-
-    new TransformOperation[ChainedTransformer[L, R], I, O] {
-      override def transform(
-          chain: ChainedTransformer[L, R],
-          transformParameters: ParameterMap,
-          input: DataSet[I]): DataSet[O] = {
-        val intermediateResult = transformLeft.transform(chain.left, transformParameters, input)
-        transformRight.transform(chain.right, transformParameters, intermediateResult)
-      }
-    }
-  }
-
-  implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit
-      leftFitOperation: FitOperation[L, I],
-      leftTransformOperation: TransformOperation[L, I, T],
-      rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = {
-    new FitOperation[ChainedTransformer[L, R], I] {
-      override def fit(
-          instance: ChainedTransformer[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[I]): Unit = {
-        instance.left.fit(input, fitParameters)
-        val intermediateResult = instance.left.transform(input, fitParameters)
-        instance.right.fit(intermediateResult, fitParameters)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
deleted file mode 100644
index e0c81a4..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Estimator.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{ParameterMap, WithParameters}
-
-trait Estimator[Self] extends WithParameters with Serializable {
-  that: Self =>
-
-  def fit[Training](
-      input: DataSet[Training],
-      fitParameters: ParameterMap = ParameterMap.Empty)(implicit
-      fitOperation: FitOperation[Self, Training]): Unit = {
-    fitOperation.fit(this, fitParameters, input)
-  }
-}
-
-object Estimator{
-  implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag]
-    : FitOperation[Self, Training] = {
-    new FitOperation[Self, Training]{
-      override def fit(
-          instance: Self,
-          fitParameters: ParameterMap,
-          input: DataSet[Training])
-        : Unit = {
-        new FitOperation[Self, Training] {
-          override def fit(
-                instance: Self,
-                fitParameters: ParameterMap,
-                input: DataSet[Training])
-            : Unit = {
-            val self = implicitly[ClassTag[Self]]
-            val training = implicitly[ClassTag[Training]]
-
-            throw new RuntimeException("There is no FitOperation defined for " + self.runtimeClass +
-              " which trains on a DataSet[" + training.runtimeClass + "]")
-          }
-        }
-      }
-    }
-  }
-
-  implicit def fallbackChainedFitOperationTransformer[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      LI,
-      LO,
-      RI](implicit
-      leftFitOperation: FitOperation[L, LI],
-      leftTransformOperation: TransformOperation[L, LI, LO],
-      rightFitOperaiton: FitOperation[R, RI])
-    : FitOperation[ChainedTransformer[L, R], LI] = {
-    new FitOperation[ChainedTransformer[L, R], LI] {
-      override def fit(
-          instance: ChainedTransformer[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[LI]): Unit = {
-        instance.left.fit(input, fitParameters)
-        instance.left.transform(input, fitParameters)
-        instance.right.fit(null, fitParameters)
-      }
-    }
-  }
-
-  implicit def fallbackChainedFitOperationPredictor[
-  L <: Transformer[L],
-  R <: Predictor[R],
-  LI,
-  LO,
-  RI](implicit
-    leftFitOperation: FitOperation[L, LI],
-    leftTransformOperation: TransformOperation[L, LI, LO],
-    rightFitOperaiton: FitOperation[R, RI])
-  : FitOperation[ChainedPredictor[L, R], LI] = {
-    new FitOperation[ChainedPredictor[L, R], LI] {
-      override def fit(
-          instance: ChainedPredictor[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[LI]): Unit = {
-        instance.transformer.fit(input, fitParameters)
-        instance.transformer.transform(input, fitParameters)
-        instance.predictor.fit(null, fitParameters)
-      }
-    }
-  }
-}
-
-trait FitOperation[Self, Training]{
-  def fit(instance: Self, fitParameters: ParameterMap,  input: DataSet[Training]): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
deleted file mode 100644
index 5acd34f..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/KMeans.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common.{ParameterMap, LabeledVector}
-import org.apache.flink.ml.math._
-
-class KMeans extends Predictor[KMeans] {
-}
-
-object KMeans{
-
-  implicit val kMeansEstimator = new FitOperation[KMeans, LabeledVector] {
-    override def fit(
-        instance: KMeans,
-        parameters: ParameterMap,
-        input: DataSet[LabeledVector]): Unit = {
-      input.print
-    }
-  }
-
-  implicit def kMeansPredictor[V <: Vector]
-    = new PredictOperation[KMeans, V, LabeledVector] {
-    override def predict(
-        instance: KMeans,
-        parameters: ParameterMap,
-        input: DataSet[V]): DataSet[LabeledVector] = {
-      input.map{
-        vector => LabeledVector(1.0, vector)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
deleted file mode 100644
index c9d082f..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Offset.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.math._
-
-class Offset extends Transformer[Offset] {
-}
-
-object Offset{
-  import Breeze._
-
-  implicit def offsetTransform[I <: Vector : CanCopy: ClassTag: TypeInformation]
-    = new TransformOperation[Offset, I, I] {
-    override def transform(
-        offset: Offset,
-        parameters: ParameterMap,
-        input: DataSet[I]): DataSet[I] = {
-      input.map{
-        vector =>
-          val brz = copy(vector).asBreeze
-
-          val result = brz + 1.0
-
-          result.fromBreeze.asInstanceOf[I]
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
deleted file mode 100644
index 8ec6665..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Predictor.scala
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{ParameterMap, WithParameters}
-
-trait Predictor[Self] extends Estimator[Self] with WithParameters with Serializable {
-  that: Self =>
-
-  def predict[Testing, Prediction](
-      input: DataSet[Testing],
-      predictParameters: ParameterMap = ParameterMap.Empty)(implicit
-      predictor: PredictOperation[Self, Testing, Prediction])
-    : DataSet[Prediction] = {
-    predictor.predict(this, predictParameters, input)
-  }
-}
-
-object Predictor{
-  implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag, Prediction: ClassTag]
-    : PredictOperation[Self, Testing, Prediction] = {
-    new PredictOperation[Self, Testing, Prediction] {
-      override def predict(
-          instance: Self,
-          predictParameters: ParameterMap,
-          input: DataSet[Testing])
-        : DataSet[Prediction] = {
-        val self = implicitly[ClassTag[Self]]
-        val testing = implicitly[ClassTag[Testing]]
-        val prediction = implicitly[ClassTag[Prediction]]
-
-        throw new RuntimeException("There is no PredictOperation defined for " + self.runtimeClass +
-          " which takes a DataSet[" + testing.runtimeClass + "] as input and returns a DataSet[" +
-          prediction.runtimeClass + "]")
-      }
-    }
-  }
-
-  implicit def fallbackChainedPredictOperation[
-      L <: Transformer[L],
-      R <: Predictor[R],
-      LI,
-      LO,
-      RI,
-      RO](implicit
-      leftTransformOperation: TransformOperation[L, LI, LO],
-      rightPredictOperation: PredictOperation[R, RI, RO]
-      )
-    : PredictOperation[ChainedPredictor[L, R], LI, RO] = {
-    new PredictOperation[ChainedPredictor[L, R], LI, RO] {
-      override def predict(
-          instance: ChainedPredictor[L, R],
-          predictParameters: ParameterMap,
-          input: DataSet[LI]): DataSet[RO] = {
-        instance.transformer.transform(input, predictParameters)
-        instance.predictor.predict(null, predictParameters)
-      }
-    }
-  }
-}
-
-abstract class PredictOperation[Self, Testing, Prediction]{
-  def predict(
-      instance: Self,
-      predictParameters: ParameterMap,
-      input: DataSet[Testing])
-    : DataSet[Prediction]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e574750/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
deleted file mode 100644
index a68c5d3..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/experimental/Scaler.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.experimental
-
-import scala.reflect.ClassTag
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-import org.apache.flink.ml.math._
-
-class Scaler extends Transformer[Scaler] {
-  var meanValue = 0.0
-}
-
-object Scaler{
-  import Breeze._
-
-  implicit def vTransform[T <: Vector : CanCopy: ClassTag: TypeInformation]
-    = new TransformOperation[Scaler, T, T] {
-    override def transform(
-        instance: Scaler,
-        parameters: ParameterMap,
-        input: DataSet[T]): DataSet[T] = {
-      input.map{
-        vector =>
-          val breezeVector = copy(vector).asBreeze
-          instance.meanValue = instance.meanValue + breeze.stats.mean(breezeVector)
-
-          breezeVector :/= instance.meanValue
-
-          breezeVector.fromBreeze.asInstanceOf[T]
-      }
-    }
-  }
-}