You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/06/26 12:12:42 UTC

[1/2] flink git commit: [FLINK-2116] [ml] Adds evaluate method to Predictor. Adds PredictOperation which can be reused by evaluate if the input data is of the format (TestingType, LabelType) where the second tuple field represents the true label.

Repository: flink
Updated Branches:
  refs/heads/master 6ab06278c -> 7a7a29403


[FLINK-2116] [ml] Adds evaluate method to Predictor. Adds PredictOperation which can be reused by evaluate if the input data is of the format (TestingType, LabelType) where the second tuple field represents the true label.

This closes #772.

Moves PolynomialFeaturesITSuite in right package


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a7a2940
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a7a2940
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a7a2940

Branch: refs/heads/master
Commit: 7a7a294033ef99c596e59f670e2e4ae9262f5c5f
Parents: 38ee125
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 2 14:34:27 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 26 12:09:35 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/ml/classification/SVM.scala    |  74 ++----
 .../flink/ml/pipeline/ChainedPredictor.scala    |  58 +++--
 .../flink/ml/pipeline/ChainedTransformer.scala  |  27 ++-
 .../apache/flink/ml/pipeline/Estimator.scala    | 143 ++++++------
 .../apache/flink/ml/pipeline/Predictor.scala    | 227 ++++++++++++++-----
 .../apache/flink/ml/pipeline/Transformer.scala  | 152 ++++++-------
 .../flink/ml/preprocessing/MinMaxScaler.scala   |  17 +-
 .../ml/preprocessing/PolynomialFeatures.scala   |  16 +-
 .../flink/ml/preprocessing/StandardScaler.scala | 131 ++++++-----
 .../apache/flink/ml/recommendation/ALS.scala    |   6 +-
 .../regression/MultipleLinearRegression.scala   |  69 ++----
 .../flink/ml/classification/SVMITSuite.scala    |   8 +-
 .../ml/feature/PolynomialFeaturesITSuite.scala  | 127 -----------
 .../flink/ml/pipeline/PipelineITSuite.scala     |  35 ++-
 .../PolynomialFeaturesITSuite.scala             | 124 ++++++++++
 .../MultipleLinearRegressionITSuite.scala       |   4 +-
 16 files changed, 649 insertions(+), 569 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index b259090..bd46204 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -18,7 +18,9 @@
 
 package org.apache.flink.ml.classification
 
-import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.ml.pipeline.{PredictOperation, FitOperation, PredictDataSetOperation,
+Predictor}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
@@ -29,7 +31,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml._
 import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{DenseVector, Vector}
 import org.apache.flink.ml.math.Breeze._
 
 import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
@@ -124,7 +126,7 @@ class SVM extends Predictor[SVM] {
   import SVM._
 
   /** Stores the learned weight vector after the fit operation */
-  var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
+  var weightsOption: Option[DataSet[DenseVector]] = None
 
   /** Sets the number of data blocks/partitions
     *
@@ -228,70 +230,20 @@ object SVM{
 
   // ========================================== Operations =========================================
 
-  /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
-    * [[LabeledVector]]
-    *
-    * @tparam T Subtype of [[Vector]]
-    * @return
-    */
-  implicit def predictValues[T <: Vector] = {
-    new PredictOperation[SVM, T, LabeledVector]{
-      override def predict(
-          instance: SVM,
-          predictParameters: ParameterMap,
-          input: DataSet[T])
-        : DataSet[LabeledVector] = {
-
-        instance.weightsOption match {
-          case Some(weights) => {
-            input.mapWithBcVariable(weights){
-              (vector, weights) => {
-                val dotProduct = weights dot vector.asBreeze
-
-                LabeledVector(dotProduct, vector)
-              }
-            }
-          }
-
+  implicit def predictVectors[T <: Vector] = {
+    new PredictOperation[SVM, DenseVector, T, Double](){
+      override def getModel(self: SVM, predictParameters: ParameterMap): DataSet[DenseVector] = {
+        self.weightsOption match {
+          case Some(model) => model
           case None => {
             throw new RuntimeException("The SVM model has not been trained. Call first fit" +
               "before calling the predict operation.")
           }
         }
       }
-    }
-  }
-
-  /** [[org.apache.flink.ml.pipeline.PredictOperation]] for [[LabeledVector ]]types. The result type
-    * is a [[(Double, Double)]] tuple, corresponding to (truth, prediction)
-    *
-    * @return A DataSet[(Double, Double)] where each tuple is a (truth, prediction) pair.
-    */
-  implicit def predictLabeledValues = {
-    new PredictOperation[SVM, LabeledVector, (Double, Double)]{
-      override def predict(
-          instance: SVM,
-          predictParameters: ParameterMap,
-          input: DataSet[LabeledVector])
-        : DataSet[(Double, Double)] = {
-
-        instance.weightsOption match {
-          case Some(weights) => {
-            input.mapWithBcVariable(weights){
-              (labeledVector, weights) => {
-                val prediction = weights dot labeledVector.vector.asBreeze
-                val truth = labeledVector.label
 
-                (truth, prediction)
-              }
-            }
-          }
-
-          case None => {
-            throw new RuntimeException("The SVM model has not been trained. Call first fit" +
-              "before calling the predict operation.")
-          }
-        }
+      override def predict(value: T, model: DenseVector): Double = {
+        value.asBreeze dot model.asBreeze
       }
     }
   }
@@ -368,7 +320,7 @@ object SVM{
         }
 
         // Store the learned weight vector in hte given instance
-        instance.weightsOption = Some(resultingWeights)
+        instance.weightsOption = Some(resultingWeights.map(_.fromBreeze[DenseVector]))
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
index 85a5b9e..bf5a8b2 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.ml.pipeline
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.ml.common.ParameterMap
 
@@ -40,15 +41,15 @@ case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer:
 
 object ChainedPredictor{
 
-  /** [[PredictOperation]] for the [[ChainedPredictor]].
+  /** [[PredictDataSetOperation]] for the [[ChainedPredictor]].
     *
-    * The [[PredictOperation]] requires the [[TransformOperation]] of the preceding [[Transformer]]
-    * and the [[PredictOperation]] of the trailing [[Predictor]]. Upon calling predict, the testing
-    * data is first transformed by the preceding [[Transformer]] and the result is then used to
-    * calculate the prediction via the trailing [[Predictor]].
+    * The [[PredictDataSetOperation]] requires the [[TransformDataSetOperation]] of the preceding
+    * [[Transformer]] and the [[PredictDataSetOperation]] of the trailing [[Predictor]]. Upon
+    * calling predict, the testing data is first transformed by the preceding [[Transformer]] and
+    * the result is then used to calculate the prediction via the trailing [[Predictor]].
     *
-    * @param transformOperation [[TransformOperation]] for the preceding [[Transformer]]
-    * @param predictOperation [[PredictOperation]] for the trailing [[Predictor]]
+    * @param transformOperation [[TransformDataSetOperation]] for the preceding [[Transformer]]
+    * @param predictOperation [[PredictDataSetOperation]] for the trailing [[Predictor]]
     * @tparam T Type of the preceding [[Transformer]]
     * @tparam P Type of the trailing [[Predictor]]
     * @tparam Testing Type of the testing data
@@ -62,12 +63,12 @@ object ChainedPredictor{
       Testing,
       Intermediate,
       Prediction](
-      implicit transformOperation: TransformOperation[T, Testing, Intermediate],
-      predictOperation: PredictOperation[P, Intermediate, Prediction])
-    : PredictOperation[ChainedPredictor[T, P], Testing, Prediction] = {
+      implicit transformOperation: TransformDataSetOperation[T, Testing, Intermediate],
+      predictOperation: PredictDataSetOperation[P, Intermediate, Prediction])
+    : PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] = {
 
-    new PredictOperation[ChainedPredictor[T, P], Testing, Prediction] {
-      override def predict(
+    new PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] {
+      override def predictDataSet(
           instance: ChainedPredictor[T, P],
           predictParameters: ParameterMap,
           input: DataSet[Testing])
@@ -81,15 +82,15 @@ object ChainedPredictor{
 
   /** [[FitOperation]] for the [[ChainedPredictor]].
     *
-    * The [[FitOperation]] requires the [[FitOperation]] and the [[TransformOperation]] of the
-    * preceding [[Transformer]] as well as the [[FitOperation]] of the trailing [[Predictor]].
+    * The [[FitOperation]] requires the [[FitOperation]] and the [[TransformDataSetOperation]] of
+    * the preceding [[Transformer]] as well as the [[FitOperation]] of the trailing [[Predictor]].
     * Upon calling fit, the preceding [[Transformer]] is first fitted to the training data.
     * The training data is then transformed by the fitted [[Transformer]]. The transformed data
     * is then used to fit the [[Predictor]].
     *
     * @param fitOperation [[FitOperation]] of the preceding [[Transformer]]
-    * @param transformOperation [[TransformOperation]] of the preceding [[Transformer]]
-    * @param predictorFitOperation [[PredictOperation]] of the trailing [[Predictor]]
+    * @param transformOperation [[TransformDataSetOperation]] of the preceding [[Transformer]]
+    * @param predictorFitOperation [[PredictDataSetOperation]] of the trailing [[Predictor]]
     * @tparam L Type of the preceding [[Transformer]]
     * @tparam R Type of the trailing [[Predictor]]
     * @tparam I Type of the training data
@@ -98,7 +99,7 @@ object ChainedPredictor{
     */
   implicit def chainedFitOperation[L <: Transformer[L], R <: Predictor[R], I, T](implicit
     fitOperation: FitOperation[L, I],
-    transformOperation: TransformOperation[L, I, T],
+    transformOperation: TransformDataSetOperation[L, I, T],
     predictorFitOperation: FitOperation[R, T]): FitOperation[ChainedPredictor[L, R], I] = {
     new FitOperation[ChainedPredictor[L, R], I] {
       override def fit(
@@ -112,4 +113,27 @@ object ChainedPredictor{
       }
     }
   }
+
+  implicit def chainedEvaluationOperation[
+      T <: Transformer[T],
+      P <: Predictor[P],
+      Testing,
+      Intermediate,
+      PredictionValue](
+      implicit transformOperation: TransformDataSetOperation[T, Testing, Intermediate],
+      evaluateOperation: EvaluateDataSetOperation[P, Intermediate, PredictionValue],
+      testingTypeInformation: TypeInformation[Testing],
+      predictionValueTypeInformation: TypeInformation[PredictionValue])
+    : EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, PredictionValue] = {
+    new EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, PredictionValue] {
+      override def evaluateDataSet(
+          instance: ChainedPredictor[T, P],
+          evaluateParameters: ParameterMap,
+          testing: DataSet[Testing])
+        : DataSet[(PredictionValue, PredictionValue)] = {
+        val intermediate = instance.transformer.transform(testing, evaluateParameters)
+        instance.predictor.evaluate(intermediate, evaluateParameters)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
index e443b80..bdf917d 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
@@ -39,13 +39,13 @@ case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L,
 
 object ChainedTransformer{
 
-  /** [[TransformOperation]] implementation for [[ChainedTransformer]].
+  /** [[TransformDataSetOperation]] implementation for [[ChainedTransformer]].
     *
     * First the transform operation of the left [[Transformer]] is called with the input data. This
     * generates intermediate data which is fed to the right [[Transformer]]'s transform operation.
     *
-    * @param transformOpLeft [[TransformOperation]] for the left [[Transformer]]
-    * @param transformOpRight [[TransformOperation]] for the right [[Transformer]]
+    * @param transformOpLeft [[TransformDataSetOperation]] for the left [[Transformer]]
+    * @param transformOpRight [[TransformDataSetOperation]] for the right [[Transformer]]
     * @tparam L Type of the left [[Transformer]]
     * @tparam R Type of the right [[Transformer]]
     * @tparam I Type of the input data
@@ -59,17 +59,20 @@ object ChainedTransformer{
       I,
       T,
       O](implicit
-      transformOpLeft: TransformOperation[L, I, T],
-      transformOpRight: TransformOperation[R, T, O])
-    : TransformOperation[ChainedTransformer[L,R], I, O] = {
+      transformOpLeft: TransformDataSetOperation[L, I, T],
+      transformOpRight: TransformDataSetOperation[R, T, O])
+    : TransformDataSetOperation[ChainedTransformer[L,R], I, O] = {
 
-    new TransformOperation[ChainedTransformer[L, R], I, O] {
-      override def transform(
+    new TransformDataSetOperation[ChainedTransformer[L, R], I, O] {
+      override def transformDataSet(
           chain: ChainedTransformer[L, R],
           transformParameters: ParameterMap,
           input: DataSet[I]): DataSet[O] = {
-        val intermediateResult = transformOpLeft.transform(chain.left, transformParameters, input)
-        transformOpRight.transform(chain.right, transformParameters, intermediateResult)
+        val intermediateResult = transformOpLeft.transformDataSet(
+          chain.left,
+          transformParameters,
+          input)
+        transformOpRight.transformDataSet(chain.right, transformParameters, intermediateResult)
       }
     }
   }
@@ -81,7 +84,7 @@ object ChainedTransformer{
     * right [[Transformer]].
     *
     * @param leftFitOperation [[FitOperation]] for the left [[Transformer]]
-    * @param leftTransformOperation [[TransformOperation]] for the left [[Transformer]]
+    * @param leftTransformOperation [[TransformDataSetOperation]] for the left [[Transformer]]
     * @param rightFitOperation [[FitOperation]] for the right [[Transformer]]
     * @tparam L Type of the left [[Transformer]]
     * @tparam R Type of the right [[Transformer]]
@@ -91,7 +94,7 @@ object ChainedTransformer{
     */
   implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit
       leftFitOperation: FitOperation[L, I],
-      leftTransformOperation: TransformOperation[L, I, T],
+      leftTransformOperation: TransformDataSetOperation[L, I, T],
       rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = {
     new FitOperation[ChainedTransformer[L, R], I] {
       override def fit(

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
index e3031f7..dbe0782 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.ml.pipeline
 
 import scala.reflect.ClassTag
+import scala.reflect.runtime.universe._
 
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
@@ -68,7 +69,9 @@ object Estimator{
     * @tparam Training Type of training data
     * @return
     */
-  implicit def fallbackFitOperation[Self: ClassTag, Training: ClassTag]
+  implicit def fallbackFitOperation[
+      Self: TypeTag,
+      Training: TypeTag]
     : FitOperation[Self, Training] = {
     new FitOperation[Self, Training]{
       override def fit(
@@ -76,88 +79,90 @@ object Estimator{
           fitParameters: ParameterMap,
           input: DataSet[Training])
         : Unit = {
+        val self = typeOf[Self]
+        val training = typeOf[Training]
 
-          val self = implicitly[ClassTag[Self]]
-          val training = implicitly[ClassTag[Training]]
-
-          throw new RuntimeException("There is no FitOperation defined for " + self.runtimeClass +
-            " which trains on a DataSet[" + training.runtimeClass + "]")
-        }
+        throw new RuntimeException("There is no FitOperation defined for " + self +
+          " which trains on a DataSet[" + training + "]")
       }
     }
+  }
 
-  /** Fallback [[FitOperation]] type class implementation for [[ChainedTransformer]]. The fallback
-    * implementation is used if the Scala compiler could not instantiate the chained fit operation
-    * defined in the companion object of [[ChainedTransformer]]. This is usually the case if either
-    * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated for one of the
-    * leaves of the chained transformer. The fallback [[FitOperation]] calls the first the
-    * fit operation of the left transformer, then the transform operation of the left transformer
-    * and last the fit operation of the right transformer.
+  /** Fallback [[PredictDataSetOperation]] if a [[Predictor]] is called with a not supported input
+    * data type. The fallback [[PredictDataSetOperation]] lets the system fail with a
+    * [[RuntimeException]] stating which input and output data types were inferred but for which no
+    * [[PredictDataSetOperation]] could be found.
     *
-    * @param leftFitOperation [[FitOperation]] of the left transformer
-    * @param leftTransformOperation [[TransformOperation]] of the left transformer
-    * @param rightFitOperaiton [[FitOperation]] of the right transformer
-    * @tparam L Type of left transformer
-    * @tparam R Type of right transformer
-    * @tparam LI Input type of left transformer's [[FitOperation]]
-    * @tparam LO Output type of left transformer's [[TransformOperation]]
+    * @tparam Self Type of the [[Predictor]]
+    * @tparam Testing Type of the testing data
     * @return
     */
-  implicit def fallbackChainedFitOperationTransformer[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      LI,
-      LO](implicit
-      leftFitOperation: FitOperation[L, LI],
-      leftTransformOperation: TransformOperation[L, LI, LO],
-      rightFitOperaiton: FitOperation[R, LO])
-    : FitOperation[ChainedTransformer[L, R], LI] = {
-    new FitOperation[ChainedTransformer[L, R], LI] {
-      override def fit(
-          instance: ChainedTransformer[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[LI]): Unit = {
-        instance.left.fit(input, fitParameters)
-        val intermediate = instance.left.transform(input, fitParameters)
-        instance.right.fit(intermediate, fitParameters)
+  implicit def fallbackPredictOperation[
+      Self: TypeTag,
+      Testing: TypeTag]
+    : PredictDataSetOperation[Self, Testing, Any] = {
+    new PredictDataSetOperation[Self, Testing, Any] {
+      override def predictDataSet(
+          instance: Self,
+          predictParameters: ParameterMap,
+          input: DataSet[Testing])
+        : DataSet[Any] = {
+        val self = typeOf[Self]
+        val testing = typeOf[Testing]
+
+        throw new RuntimeException("There is no PredictOperation defined for " + self +
+          " which takes a DataSet[" + testing + "] as input.")
       }
     }
   }
 
-  /** Fallback [[FitOperation]] type class implementation for [[ChainedPredictor]]. The fallback
-    * implementation is used if the Scala compiler could not instantiate the chained fit operation
-    * defined in the companion object of [[ChainedPredictor]]. This is usually the case if either
-    * a [[FitOperation]] or a [[TransformOperation]] could not be instantiated for one of the
-    * leaves of the chained transformer. The fallback [[FitOperation]] calls the first the
-    * fit operation of the left transformer, then the transform operation of the left transformer
-    * and last the fit operation of the right transformer.
+  /** Fallback [[TransformDataSetOperation]] for [[Transformer]] which do not support the input or
+    * output type with which they are called. This is usualy the case if pipeline operators are
+    * chained which have incompatible input/output types. In order to detect these failures, the
+    * fallback [[TransformDataSetOperation]] throws a [[RuntimeException]] with the corresponding
+    * input/output types. Consequently, a wrong pipeline will be detected at pre-flight phase of
+    * Flink and thus prior to execution time.
     *
-    * @param leftFitOperation [[FitOperation]] of the left transformer
-    * @param leftTransformOperation [[TransformOperation]] of the left transformer
-    * @param rightFitOperaiton [[FitOperation]] of the right transformer
-    * @tparam L Type of left transformer
-    * @tparam R Type of right transformer
-    * @tparam LI Input type of left transformer's [[FitOperation]]
-    * @tparam LO Output type of left transformer's [[TransformOperation]]
+    * @tparam Self Type of the [[Transformer]] for which the [[TransformDataSetOperation]] is
+    *              defined
+    * @tparam IN Input data type of the [[TransformDataSetOperation]]
     * @return
     */
-  implicit def fallbackChainedFitOperationPredictor[
-  L <: Transformer[L],
-  R <: Predictor[R],
-  LI,
-  LO](implicit
-    leftFitOperation: FitOperation[L, LI],
-    leftTransformOperation: TransformOperation[L, LI, LO],
-    rightFitOperaiton: FitOperation[R, LO])
-  : FitOperation[ChainedPredictor[L, R], LI] = {
-    new FitOperation[ChainedPredictor[L, R], LI] {
-      override def fit(
-          instance: ChainedPredictor[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[LI]): Unit = {
-        instance.transformer.fit(input, fitParameters)
-        val intermediate = instance.transformer.transform(input, fitParameters)
-        instance.predictor.fit(intermediate, fitParameters)
+  implicit def fallbackTransformOperation[
+  Self: TypeTag,
+  IN: TypeTag]
+  : TransformDataSetOperation[Self, IN, Any] = {
+    new TransformDataSetOperation[Self, IN, Any] {
+      override def transformDataSet(
+        instance: Self,
+        transformParameters: ParameterMap,
+        input: DataSet[IN])
+      : DataSet[Any] = {
+        val self = typeOf[Self]
+        val in = typeOf[IN]
+
+        throw new RuntimeException("There is no TransformOperation defined for " +
+          self +  " which takes a DataSet[" + in +
+          "] as input.")
+      }
+    }
+  }
+
+  implicit def fallbackEvaluateOperation[
+      Self: TypeTag,
+      Testing: TypeTag]
+    : EvaluateDataSetOperation[Self, Testing, Any] = {
+    new EvaluateDataSetOperation[Self, Testing, Any] {
+      override def evaluateDataSet(
+        instance: Self,
+        predictParameters: ParameterMap,
+        input: DataSet[Testing])
+      : DataSet[(Any, Any)] = {
+        val self = typeOf[Self]
+        val testing = typeOf[Testing]
+
+        throw new RuntimeException("There is no PredictOperation defined for " + self +
+          " which takes a DataSet[" + testing + "] as input.")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
index 9bb5c5c..cd9cc51 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
@@ -18,18 +18,19 @@
 
 package org.apache.flink.ml.pipeline
 
-import scala.reflect.ClassTag
+import org.apache.flink.api.common.typeinfo.TypeInformation
 
-import org.apache.flink.api.scala.DataSet
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
 import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
 
 /** Predictor trait for Flink's pipeline operators.
   *
   * A [[Predictor]] calculates predictions for testing data based on the model it learned during
   * the fit operation (training phase). In order to do that, the implementing class has to provide
-  * a [[FitOperation]] and a [[PredictOperation]] implementation for the correct types. The implicit
-  * values should be put into the scope of the companion object of the implementing class to make
-  * them retrievable for the Scala compiler.
+  * a [[FitOperation]] and a [[PredictDataSetOperation]] implementation for the correct types. The
+  * implicit values should be put into the scope of the companion object of the implementing class
+  * to make them retrievable for the Scala compiler.
   *
   * The pipeline mechanism has been inspired by scikit-learn
   *
@@ -39,11 +40,12 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters {
   that: Self =>
 
   /** Predict testing data according the learned model. The implementing class has to provide
-    * a corresponding implementation of [[PredictOperation]] which contains the prediction logic.
+    * a corresponding implementation of [[PredictDataSetOperation]] which contains the prediction
+    * logic.
     *
     * @param testing Testing data which shall be predicted
     * @param predictParameters Additional parameters for the prediction
-    * @param predictor [[PredictOperation]] which encapsulates the prediction logic
+    * @param predictor [[PredictDataSetOperation]] which encapsulates the prediction logic
     * @tparam Testing Type of the testing data
     * @tparam Prediction Type of the prediction data
     * @return
@@ -51,84 +53,132 @@ trait Predictor[Self] extends Estimator[Self] with WithParameters {
   def predict[Testing, Prediction](
       testing: DataSet[Testing],
       predictParameters: ParameterMap = ParameterMap.Empty)(implicit
-      predictor: PredictOperation[Self, Testing, Prediction])
+      predictor: PredictDataSetOperation[Self, Testing, Prediction])
     : DataSet[Prediction] = {
     FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
-    predictor.predict(this, predictParameters, testing)
+    predictor.predictDataSet(this, predictParameters, testing)
+  }
+
+  /** Evaluates the testing data by computing the prediction value and returning a pair of true
+    * label value and prediction value. It is important that the implementation chooses a Testing
+    * type from which it can extract the true label value.
+    *
+    * @param testing
+    * @param evaluateParameters
+    * @param evaluator
+    * @tparam Testing
+    * @tparam PredictionValue
+    * @return
+    */
+  def evaluate[Testing, PredictionValue](
+      testing: DataSet[Testing],
+      evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
+      evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
+    : DataSet[(PredictionValue, PredictionValue)] = {
+    FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
+    evaluator.evaluateDataSet(this, evaluateParameters, testing)
   }
 }
 
-object Predictor{
+object Predictor {
 
-  /** Fallback [[PredictOperation]] if a [[Predictor]] is called with a not supported input data
-    * type. The fallback [[PredictOperation]] lets the system fail with a [[RuntimeException]]
-    * stating which input and output data types were inferred but for which no [[PredictOperation]]
-    * could be found.
+  /** Default [[PredictDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple
+    * of testing element and its prediction value.
     *
-    * @tparam Self Type of the [[Predictor]]
-    * @tparam Testing Type of the testing data
+    * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after
+    * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then
+    * the Scala compiler does not find the implicit [[PredictOperation]] value.
+    *
+    * @param predictOperation
+    * @param testingTypeInformation
+    * @param predictionValueTypeInformation
+    * @tparam Instance
+    * @tparam Model
+    * @tparam Testing
+    * @tparam PredictionValue
     * @return
     */
-  implicit def fallbackPredictOperation[Self: ClassTag, Testing: ClassTag]
-    : PredictOperation[Self, Testing, Any] = {
-    new PredictOperation[Self, Testing, Any] {
-      override def predict(
-          instance: Self,
+  implicit def defaultPredictDataSetOperation[
+      Instance <: Estimator[Instance],
+      Model,
+      Testing,
+      PredictionValue](
+      implicit predictOperation: PredictOperation[Instance, Model, Testing, PredictionValue],
+      testingTypeInformation: TypeInformation[Testing],
+      predictionValueTypeInformation: TypeInformation[PredictionValue])
+    : PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] = {
+    new PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] {
+      override def predictDataSet(
+          instance: Instance,
           predictParameters: ParameterMap,
           input: DataSet[Testing])
-        : DataSet[Any] = {
-        val self = implicitly[ClassTag[Self]]
-        val testing = implicitly[ClassTag[Testing]]
+        : DataSet[(Testing, PredictionValue)] = {
+        val resultingParameters = instance.parameters ++ predictParameters
 
-        throw new RuntimeException("There is no PredictOperation defined for " + self.runtimeClass +
-          " which takes a DataSet[" + testing.runtimeClass + "] as input.")
+        val model = predictOperation.getModel(instance, resultingParameters)
+
+        implicit val resultTypeInformation = createTypeInformation[(Testing, PredictionValue)]
+
+        input.mapWithBcVariable(model){
+          (element, model) => {
+            (element, predictOperation.predict(element, model))
+          }
+        }
       }
     }
   }
 
-  /** Fallback [[PredictOperation]] for a [[ChainedPredictor]] if a [[TransformOperation]] for
-    * one of the [[Transformer]] and its respective types or the [[PredictOperation]] for the
-    * [[Predictor]] and its respective type could not be found. This is usually the case, if the
-    * the pipeline contains pipeline operators which work on incompatible types.
+  /** Default [[EvaluateDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple
+    * of true label value and predicted label value.
     *
-    * The fallback [[PredictOperation]] first transforms the input data by calling the transform
-    * method of the [[Transformer]] and then the predict method of the [[Predictor]].
+    * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after
+    * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then
+    * the Scala compiler does not find the implicit [[PredictOperation]] value.
     *
-    * @param leftTransformOperation [[TransformOperation]] of the [[Transformer]]
-    * @param rightPredictOperation [[PredictOperation]] of the [[Predictor]]
-    * @tparam L Type of the [[Transformer]]
-    * @tparam R Type of the [[Predictor]]
-    * @tparam LI Input type of the [[Transformer]]
-    * @tparam LO Output type of the [[Transformer]]
-    * @tparam RO Prediction type of the [[Predictor]]
+    * @param predictOperation
+    * @param testingTypeInformation
+    * @param predictionValueTypeInformation
+    * @tparam Instance
+    * @tparam Model
+    * @tparam Testing
+    * @tparam PredictionValue
     * @return
     */
-  implicit def fallbackChainedPredictOperation[
-      L <: Transformer[L],
-      R <: Predictor[R],
-      LI,
-      LO,
-      RO](implicit
-      leftTransformOperation: TransformOperation[L, LI, LO],
-      rightPredictOperation: PredictOperation[R, LO, RO]
-      )
-    : PredictOperation[ChainedPredictor[L, R], LI, RO] = {
-    new PredictOperation[ChainedPredictor[L, R], LI, RO] {
-      override def predict(
-          instance: ChainedPredictor[L, R],
-          predictParameters: ParameterMap,
-          input: DataSet[LI]): DataSet[RO] = {
-        val intermediate = instance.transformer.transform(input, predictParameters)
-        instance.predictor.predict(intermediate, predictParameters)
+  implicit def defaultEvaluateDataSetOperation[
+      Instance <: Estimator[Instance],
+      Model,
+      Testing,
+      PredictionValue](
+      implicit predictOperation: PredictOperation[Instance, Model, Testing, PredictionValue],
+      testingTypeInformation: TypeInformation[Testing],
+      predictionValueTypeInformation: TypeInformation[PredictionValue])
+    : EvaluateDataSetOperation[Instance, (Testing, PredictionValue), PredictionValue] = {
+    new EvaluateDataSetOperation[Instance, (Testing, PredictionValue), PredictionValue] {
+      override def evaluateDataSet(
+          instance: Instance,
+          evaluateParameters: ParameterMap,
+          testing: DataSet[(Testing, PredictionValue)])
+        : DataSet[(PredictionValue,  PredictionValue)] = {
+        val resultingParameters = instance.parameters ++ evaluateParameters
+        val model = predictOperation.getModel(instance, resultingParameters)
+
+        implicit val resultTypeInformation = createTypeInformation[(Testing, PredictionValue)]
+
+        testing.mapWithBcVariable(model){
+          (element, model) => {
+            (element._2, predictOperation.predict(element._1, model))
+          }
+        }
       }
     }
   }
 }
 
-/** Type class for the predict operation of [[Predictor]].
+/** Type class for the predict operation of [[Predictor]]. This predict operation works on DataSets.
   *
-  * Predictors have to implement this trait and make the result available as an implicit value or
-  * function in the scope of their companion objects.
+  * [[Predictor]]s either have to implement this trait or the [[PredictOperation]] trait. The
+  * implementation has to be made available as an implicit value or function in the scope of
+  * their companion objects.
   *
   * The first type parameter is the type of the implementing [[Predictor]] class so that the Scala
   * compiler includes the companion object of this class in the search scope for the implicit
@@ -138,10 +188,65 @@ object Predictor{
   * @tparam Testing Type of testing data
   * @tparam Prediction Type of predicted data
   */
-trait PredictOperation[Self, Testing, Prediction]{
-  def predict(
+trait PredictDataSetOperation[Self, Testing, Prediction] extends Serializable{
+
+  /** Calculates the predictions for all elements in the [[DataSet]] input
+    * 
+    * @param instance
+    * @param predictParameters
+    * @param input
+    * @return
+    */
+  def predictDataSet(
       instance: Self,
       predictParameters: ParameterMap,
       input: DataSet[Testing])
     : DataSet[Prediction]
 }
+
+/** Type class for predict operation. It takes an element and the model and then computes the
+  * prediction value for this element.
+  *
+  * It is sufficient for a [[Predictor]] to only implement this trait to support the evaluate and
+  * predict method.
+  *
+  * @tparam Instance
+  * @tparam Model
+  * @tparam Testing
+  * @tparam Prediction
+  */
+trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializable{
+
+  /** Defines how to retrieve the model of the type for which this operation was defined
+    * 
+    * @param instance
+    * @return
+    */
+  def getModel(instance: Instance, predictParameters: ParameterMap): DataSet[Model]
+
+  /** Calculates the prediction for a single element given the model of the [[Predictor]].
+    *
+    * @param value
+    * @param model
+    * @return
+    */
+  def predict(value: Testing, model: Model): Prediction
+}
+
+/** Type class for the evalute operation of [[Predictor]]. This evaluate operation works on
+  * DataSets.
+  *
+  * It takes a [[DataSet]] of some type. For each element of this [[DataSet]] the evaluate method
+  * computes the prediction value and returns a tuple of true label value and prediction value.
+  *
+  * @tparam Instance
+  * @tparam Testing
+  * @tparam PredictionValue
+  */
+trait EvaluateDataSetOperation[Instance, Testing, PredictionValue] extends Serializable{
+  def evaluateDataSet(
+      instance: Instance,
+      evaluateParameters: ParameterMap,
+      testing: DataSet[Testing])
+    : DataSet[(PredictionValue, PredictionValue)]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
index 7e2c744..014ad2b 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
@@ -18,18 +18,21 @@
 
 package org.apache.flink.ml.pipeline
 
-import scala.reflect.ClassTag
-
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.DataSet
+import org.apache.flink.ml._
 import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
 
+import scala.reflect.ClassTag
+
 /** Transformer trait for Flink's pipeline operators.
   *
   * A Transformer transforms a [[DataSet]] of an input type into a [[DataSet]] of an output type.
   * Furthermore, a [[Transformer]] is also an [[Estimator]], because some transformations depend
   * on the training data. In order to do that the implementing class has to provide a
-  * [[TransformOperation]] and [[FitOperation]] implementation. The Scala compiler finds these
-  * implicit values if it is put in the scope of the companion object of the implementing class.
+  * [[TransformDataSetOperation]] and [[FitOperation]] implementation. The Scala compiler finds
+  * these implicit values if it is put in the scope of the companion object of the implementing
+  * class.
   *
   * [[Transformer]] can be chained with other [[Transformer]] and [[Predictor]] to create
   * pipelines. These pipelines can consist of an arbitrary number of [[Transformer]] and at most
@@ -46,11 +49,13 @@ trait Transformer[Self <: Transformer[Self]]
   that: Self =>
 
   /** Transform operation which transforms an input [[DataSet]] of type I into an ouptut [[DataSet]]
-    * of type O. The actual transform operation is implemented within the [[TransformOperation]].
+    * of type O. The actual transform operation is implemented within the
+    * [[TransformDataSetOperation]].
     *
     * @param input Input [[DataSet]] of type I
-    * @param transformParameters Additional parameters for the [[TransformOperation]]
-    * @param transformOperation [[TransformOperation]] which encapsulates the algorithm's logic
+    * @param transformParameters Additional parameters for the [[TransformDataSetOperation]]
+    * @param transformOperation [[TransformDataSetOperation]] which encapsulates the algorithm's
+    *                          logic
     * @tparam Input Input data type
     * @tparam Output Ouptut data type
     * @return
@@ -58,10 +63,10 @@ trait Transformer[Self <: Transformer[Self]]
   def transform[Input, Output](
       input: DataSet[Input],
       transformParameters: ParameterMap = ParameterMap.Empty)
-      (implicit transformOperation: TransformOperation[Self, Input, Output])
+      (implicit transformOperation: TransformDataSetOperation[Self, Input, Output])
     : DataSet[Output] = {
     FlinkMLTools.registerFlinkMLTypes(input.getExecutionEnvironment)
-    transformOperation.transform(that, transformParameters, input)
+    transformOperation.transformDataSet(that, transformParameters, input)
   }
 
   /** Chains two [[Transformer]] to form a [[ChainedTransformer]].
@@ -86,93 +91,74 @@ trait Transformer[Self <: Transformer[Self]]
 }
 
 object Transformer{
-
-  /** Fallback [[TransformOperation]] for [[ChainedTransformer]] which is used if no suitable
-    * [[TransformOperation]] implementation can be found. This implementation is used if there is no
-    * [[TransformOperation]] for one of the leaves of the [[ChainedTransformer]] for the given
-    * input types. This is usually the case if one [[Transformer]] does not support the transform
-    * operation for the input type.
-    *
-    * The fallback [[TransformOperation]] for [[ChainedTransformer]] calls first the transform
-    * operation of the left transformer and then the transform operation of the right transformer.
-    * That way the fallback [[TransformOperation]] for a [[Transformer]] will be called which
-    * will fail the job in the pre-flight phase by throwing an exception.
-    *
-    * @param transformLeft Left [[Transformer]] of the pipeline
-    * @param transformRight Right [[Transformer]] of the pipeline
-    * @tparam L Type of the left [[Transformer]]
-    * @tparam R Type of the right [[Transformer]]
-    * @tparam LI Input type of left transformer's [[TransformOperation]]
-    * @tparam LO Output type of left transformer's [[TransformOperation]]
-    * @tparam RO Output type of right transformer's [[TransformOperation]]
-    * @return
-    */
-  implicit def fallbackChainedTransformOperation[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      LI,
-      LO,
-      RO]
-      (implicit transformLeft: TransformOperation[L, LI, LO],
-      transformRight: TransformOperation[R, LO, RO])
-    : TransformOperation[ChainedTransformer[L,R], LI, RO] = {
-
-    new TransformOperation[ChainedTransformer[L, R], LI, RO] {
-      override def transform(
-          chain: ChainedTransformer[L, R],
-          transformParameters: ParameterMap,
-          input: DataSet[LI]): DataSet[RO] = {
-        val intermediate = transformLeft.transform(chain.left, transformParameters, input)
-        transformRight.transform(chain.right, transformParameters, intermediate)
-      }
-    }
-  }
-
-  /** Fallback [[TransformOperation]] for [[Transformer]] which do not support the input or output
-    * type with which they are called. This is usualy the case if pipeline operators are chained
-    * which have incompatible input/output types. In order to detect these failures, the fallback
-    * [[TransformOperation]] throws a [[RuntimeException]] with the corresponding input/output
-    * types. Consequently, a wrong pipeline will be detected at pre-flight phase of Flink and
-    * thus prior to execution time.
-    *
-    * @tparam Self Type of the [[Transformer]] for which the [[TransformOperation]] is defined
-    * @tparam IN Input data type of the [[TransformOperation]]
-    * @return
-    */
-  implicit def fallbackTransformOperation[
-      Self: ClassTag,
-      IN: ClassTag]
-    : TransformOperation[Self, IN, Any] = {
-    new TransformOperation[Self, IN, Any] {
-      override def transform(
-          instance: Self,
+  implicit def defaultTransformDataSetOperation[
+      Instance <: Estimator[Instance],
+      Model,
+      Input,
+      Output](
+      implicit transformOperation: TransformOperation[Instance, Model, Input, Output],
+      outputTypeInformation: TypeInformation[Output],
+      outputClassTag: ClassTag[Output])
+    : TransformDataSetOperation[Instance, Input, Output] = {
+    new TransformDataSetOperation[Instance, Input, Output] {
+      override def transformDataSet(
+          instance: Instance,
           transformParameters: ParameterMap,
-          input: DataSet[IN])
-        : DataSet[Any] = {
-        val self = implicitly[ClassTag[Self]]
-        val in = implicitly[ClassTag[IN]]
+          input: DataSet[Input])
+        : DataSet[Output] = {
+        val resultingParameters = instance.parameters ++ transformParameters
+        val model = transformOperation.getModel(instance, resultingParameters)
 
-        throw new RuntimeException("There is no TransformOperation defined for " +
-          self.runtimeClass +  " which takes a DataSet[" + in.runtimeClass +
-          "] as input.")
+        input.mapWithBcVariable(model){
+          (element, model) => transformOperation.transform(element, model)
+        }
       }
     }
   }
 }
 
-/** Type class for a transform operation of [[Transformer]].
+/** Type class for a transform operation of [[Transformer]]. This works on [[DataSet]] of elements.
   *
-  * The [[TransformOperation]] contains a self type parameter so that the Scala compiler looks into
-  * the companion object of this class to find implicit values.
+  * The [[TransformDataSetOperation]] contains a self type parameter so that the Scala compiler
+  * looks into the companion object of this class to find implicit values.
   *
-  * @tparam Self Type of the [[Transformer]] for which the [[TransformOperation]] is defined
+  * @tparam Instance Type of the [[Transformer]] for which the [[TransformDataSetOperation]] is
+  *                  defined
   * @tparam Input Input data type
   * @tparam Output Ouptut data type
   */
-abstract class TransformOperation[Self, Input, Output] extends Serializable{
-  def transform(
-      instance: Self,
+trait TransformDataSetOperation[Instance, Input, Output] extends Serializable{
+  def transformDataSet(
+      instance: Instance,
       transformParameters: ParameterMap,
       input: DataSet[Input])
     : DataSet[Output]
 }
+
+/** Type class for a transform operation which works on a single element and the corresponding model
+  * of the [[Transformer]].
+  *
+  * @tparam Instance
+  * @tparam Model
+  * @tparam Input
+  * @tparam Output
+  */
+trait TransformOperation[Instance, Model, Input, Output] extends Serializable{
+
+  /** Retrieves the model of the [[Transformer]] for which this operation has been defined.
+    *
+    * @param instance
+    * @param transformParemters
+    * @return
+    */
+  def getModel(instance: Instance, transformParemters: ParameterMap): DataSet[Model]
+
+  /** Transforms a single element with respect to the model associated with the respective
+    * [[Transformer]]
+    *
+    * @param element
+    * @param model
+    * @return
+    */
+  def transform(element: Input, model: Model): Output
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
index bded9c6..217e2c2 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -25,7 +25,8 @@ import org.apache.flink.ml._
 import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
 import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
-import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer}
+import org.apache.flink.ml.pipeline.{TransformDataSetOperation, FitOperation,
+Transformer}
 import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min}
 
 import scala.reflect.ClassTag
@@ -161,18 +162,18 @@ object MinMaxScaler {
     minMax
   }
 
-  /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to
+  /** [[TransformDataSetOperation]] which scales input data of subtype of [[Vector]] with respect to
     * the calculated minimum and maximum of the training data. The minimum and maximum
     * values of the resulting data is configurable.
     *
     * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
-    * @return [[TransformOperation]] scaling subtypes of [[Vector]] such that the feature values are
-    *        in the configured range
+    * @return [[TransformDataSetOperation]] scaling subtypes of [[Vector]] such that the feature
+    *        values are in the configured range
     */
   implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
   = {
-    new TransformOperation[MinMaxScaler, T, T] {
-      override def transform(
+    new TransformDataSetOperation[MinMaxScaler, T, T] {
+      override def transformDataSet(
         instance: MinMaxScaler,
         transformParameters: ParameterMap,
         input: DataSet[T])
@@ -201,8 +202,8 @@ object MinMaxScaler {
   }
 
   implicit val transformLabeledVectors = {
-    new TransformOperation[MinMaxScaler, LabeledVector, LabeledVector] {
-      override def transform(instance: MinMaxScaler,
+    new TransformDataSetOperation[MinMaxScaler, LabeledVector, LabeledVector] {
+      override def transformDataSet(instance: MinMaxScaler,
         transformParameters: ParameterMap,
         input: DataSet[LabeledVector]): DataSet[LabeledVector] = {
         val resultingParameters = instance.parameters ++ transformParameters

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
index 8c7daad..f1c788e 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.{DataSet, _}
 import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.{Vector, VectorBuilder}
-import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer}
+import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer}
 import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree
 
 import scala.reflect.ClassTag
@@ -96,8 +96,8 @@ object PolynomialFeatures{
     }
   }
 
-  /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a [[Vector]] into the polynomial
-    * feature space.
+  /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[Vector]] into the
+    * polynomial feature space.
     *
     * @tparam T Subclass of [[Vector]]
     * @return
@@ -105,8 +105,8 @@ object PolynomialFeatures{
   implicit def transformVectorIntoPolynomialBase[
       T <: Vector : VectorBuilder: TypeInformation: ClassTag
     ] = {
-    new TransformOperation[PolynomialFeatures, T, T] {
-      override def transform(
+    new TransformDataSetOperation[PolynomialFeatures, T, T] {
+      override def transformDataSet(
           instance: PolynomialFeatures,
           transformParameters: ParameterMap,
           input: DataSet[T])
@@ -124,13 +124,13 @@ object PolynomialFeatures{
     }
   }
 
-  /** [[org.apache.flink.ml.pipeline.TransformOperation]] to map a [[LabeledVector]] into the
+  /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[LabeledVector]] into the
     * polynomial feature space
     */
   implicit val transformLabeledVectorIntoPolynomialBase =
-    new TransformOperation[PolynomialFeatures, LabeledVector, LabeledVector] {
+    new TransformDataSetOperation[PolynomialFeatures, LabeledVector, LabeledVector] {
 
-    override def transform(
+    override def transformDataSet(
         instance: PolynomialFeatures,
         transformParameters: ParameterMap,
         input: DataSet[LabeledVector])

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index bf09b20..c62657f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -23,11 +23,11 @@ import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.ml._
 import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
 import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
-import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation,
+Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
 import scala.reflect.ClassTag
@@ -189,68 +189,93 @@ object StandardScaler {
     metrics
   }
 
-  /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to
-    * the calculated mean and standard deviation of the training data. The mean and standard
-    * deviation of the resulting data is configurable.
+  /** Base class for StandardScaler's [[TransformOperation]]. This class has to be extended for
+    * all types which are supported by [[StandardScaler]]'s transform operation.
     *
-    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
+    * @tparam T
+    */
+  abstract class StandardScalerTransformOperation[T: TypeInformation: ClassTag]
+    extends TransformOperation[
+        StandardScaler,
+        (linalg.Vector[Double], linalg.Vector[Double]),
+        T,
+        T] {
+
+    var mean: Double = _
+    var std: Double = _
+
+    override def getModel(
+      instance: StandardScaler,
+      transformParameters: ParameterMap)
+    : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
+      mean = transformParameters(Mean)
+      std = transformParameters(Std)
+
+      instance.metricsOption match {
+        case Some(metrics) => metrics
+        case None =>
+          throw new RuntimeException("The StandardScaler has not been fitted to the data. " +
+            "This is necessary to estimate the mean and standard deviation of the data.")
+      }
+    }
+
+    def scale[V <: Vector: BreezeVectorConverter](
+      vector: V,
+      model: (linalg.Vector[Double], linalg.Vector[Double]))
+    : V = {
+      val (broadcastMean, broadcastStd) = model
+      var myVector = vector.asBreeze
+      myVector -= broadcastMean
+      myVector :/= broadcastStd
+      myVector = (myVector :* std) + mean
+      myVector.fromBreeze
+    }
+  }
+
+  /** [[TransformOperation]] to transform [[Vector]] types
+    *
+    * @tparam T
     * @return
     */
   implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
-    new TransformOperation[StandardScaler, T, T] {
+    new StandardScalerTransformOperation[T]() {
       override def transform(
-        instance: StandardScaler,
-        transformParameters: ParameterMap,
-        input: DataSet[T])
-      : DataSet[T] = {
-
-        val resultingParameters = instance.parameters ++ transformParameters
-        val mean = resultingParameters(Mean)
-        val std = resultingParameters(Std)
-
-        instance.metricsOption match {
-          case Some(metrics) => {
-            input.mapWithBcVariable(metrics){
-              (vector, metrics) => {
-                val (broadcastMean, broadcastStd) = metrics
-                scaleVector(vector, broadcastMean, broadcastStd, mean, std)
-              }
-            }
-          }
+            vector: T,
+            model: (linalg.Vector[Double], linalg.Vector[Double]))
+        : T = {
+        scale(vector, model)
+      }
+    }
+  }
 
-          case None =>
-            throw new RuntimeException("The StandardScaler has not been fitted to the data. " +
-              "This is necessary to estimate the mean and standard deviation of the data.")
-        }
+  /** [[TransformOperation]] to transform tuples of type ([[Vector]], [[Double]]).
+    *
+    * @tparam T
+    * @return
+    */
+  implicit def transformTupleVectorDouble[
+      T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
+    new StandardScalerTransformOperation[(T, Double)] {
+      override def transform(
+          element: (T, Double),
+          model: (linalg.Vector[Double], linalg.Vector[Double]))
+        : (T, Double) = {
+        (scale(element._1, model), element._2)
       }
     }
   }
 
-  implicit val transformLabeledVectors = {
-    new TransformOperation[StandardScaler, LabeledVector, LabeledVector] {
-      override def transform(instance: StandardScaler, transformParameters: ParameterMap, input:
-      DataSet[LabeledVector]): DataSet[LabeledVector] = {
-        val resultingParameters = instance.parameters ++ transformParameters
-        val mean = resultingParameters(Mean)
-        val std = resultingParameters(Std)
-
-        instance.metricsOption match {
-          case Some(metrics) => {
-            input.mapWithBcVariable(metrics){
-              (labeledVector, metrics) => {
-                val (broadcastMean, broadcastStd) = metrics
-                val LabeledVector(label, vector) = labeledVector
-
-                LabeledVector(label, scaleVector(vector, broadcastMean, broadcastStd, mean, std))
-              }
-            }
-          }
+  /** [[TransformOperation]] to transform [[LabeledVector]].
+    *
+    */
+  implicit val transformLabeledVector = new StandardScalerTransformOperation[LabeledVector] {
+    override def transform(
+        element: LabeledVector,
+        model: (linalg.Vector[Double], linalg.Vector[Double]))
+      : LabeledVector = {
+      val LabeledVector(label, vector) = element
 
-          case None =>
-            throw new RuntimeException("The StandardScaler has not been fitted to the data. " +
-              "This is necessary to estimate the mean and standard deviation of the data.")
-        }
-      }
+      LabeledVector(label, scale(vector, model))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index 04174d3..d8af42f 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -25,7 +25,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor}
 import org.apache.flink.types.Value
 import org.apache.flink.util.Collector
 import org.apache.flink.api.common.functions.{Partitioner => FlinkPartitioner, GroupReduceFunction, CoGroupFunction}
@@ -390,8 +390,8 @@ object ALS {
   // ===================================== Operations ==============================================
 
   /** Predict operation which calculates the matrix entry for the given indices  */
-  implicit val predictRating = new PredictOperation[ALS, (Int, Int), (Int ,Int, Double)] {
-    override def predict(
+  implicit val predictRating = new PredictDataSetOperation[ALS, (Int, Int), (Int ,Int, Double)] {
+    override def predictDataSet(
         instance: ALS,
         predictParameters: ParameterMap,
         input: DataSet[(Int, Int)])

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
index 439d038..c3b3182 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/regression/MultipleLinearRegression.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.ml.regression
 
 import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{Breeze, Vector}
 import org.apache.flink.ml.common._
 
 import org.apache.flink.api.scala._
 
-import org.apache.flink.ml.optimization.{LinearPrediction, SquaredLoss, GenericLossFunction,
-SimpleGradientDescent}
-import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+import org.apache.flink.ml.optimization.{LinearPrediction, SquaredLoss, GenericLossFunction, SimpleGradientDescent}
+import org.apache.flink.ml.pipeline.{PredictOperation, FitOperation, Predictor}
+
 
 /** Multiple linear regression using the ordinary least squares (OLS) estimator.
   *
@@ -128,8 +128,6 @@ class MultipleLinearRegression extends Predictor[MultipleLinearRegression] {
 
 object MultipleLinearRegression {
 
-  import org.apache.flink.ml._
-
   val WEIGHTVECTOR_BROADCAST = "weights_broadcast"
 
   val lossFunction = GenericLossFunction(SquaredLoss, LinearPrediction)
@@ -189,56 +187,13 @@ object MultipleLinearRegression {
     }
   }
 
-  /** Calculates the predictions for new data with respect to the learned linear model.
-    *
-    * @tparam T Testing data type for which the prediction is calculated. Has to be a subtype of
-    *           [[Vector]]
-    * @return [[PredictOperation]] which calculates for a given vector it's label according to the
-    *        linear model. The result of this [[PredictOperation]] is a [[LabeledVector]]
-    */
   implicit def predictVectors[T <: Vector] = {
-    new PredictOperation[MultipleLinearRegression, T, LabeledVector] {
-      override def predict(
-        instance: MultipleLinearRegression,
-        predictParameters: ParameterMap,
-        input: DataSet[T])
-      : DataSet[LabeledVector] = {
-        instance.weightsOption match {
-          case Some(weights) => {
-            input.mapWithBcVariable(weights) {
-              (dataPoint, weights) =>
-                LabeledVector(LinearPrediction.predict(dataPoint, weights), dataPoint)
-            }
-          }
+    new PredictOperation[MultipleLinearRegression, WeightVector, T, Double]() {
+      override def getModel(self: MultipleLinearRegression, predictParameters: ParameterMap)
+        : DataSet[WeightVector] = {
+        self.weightsOption match {
+          case Some(weights) => weights
 
-          case None => {
-            throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
-              "data. This is necessary to learn the weight vector of the linear function.")
-          }
-        }
-      }
-    }
-  }
-
-  /** Calculates the predictions for labeled data with respect to the learned linear model.
-    *
-    * @return A DataSet[(Double, Double)] where each tuple is a (truth, prediction) pair.
-    */
-  implicit def predictLabeledVectors = {
-    new PredictOperation[MultipleLinearRegression, LabeledVector, (Double, Double)] {
-      override def predict(
-        instance: MultipleLinearRegression,
-        predictParameters: ParameterMap,
-        input: DataSet[LabeledVector])
-      : DataSet[(Double, Double)] = {
-        instance.weightsOption match {
-          case Some(weights) => {
-            input.mapWithBcVariable(weights) {
-              (labeledVector, weights) => {
-                (labeledVector.label, LinearPrediction.predict(labeledVector.vector, weights))
-              }
-            }
-          }
 
           case None => {
             throw new RuntimeException("The MultipleLinearRegression has not been fitted to the " +
@@ -246,6 +201,12 @@ object MultipleLinearRegression {
           }
         }
       }
+      override def predict(value: T, model: WeightVector): Double = {
+        import Breeze._
+        val WeightVector(weights, weight0) = model
+        val dotProduct = value.asBreeze.dot(weights.asBreeze)
+        dotProduct + weight0
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
index 25c2afb..b1a91a2 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
@@ -40,11 +40,13 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase {
 
     val trainingDS = env.fromCollection(Classification.trainingData)
 
+    val testingDS = trainingDS.map(_.vector)
+
     svm.fit(trainingDS)
 
     val weightVector = svm.weightsOption.get.collect().apply(0)
 
-    weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
+    weightVector.valueIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
       case (weight, expectedWeight) =>
         weight should be(expectedWeight +- 0.1)
     }
@@ -63,11 +65,13 @@ class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase {
 
     val trainingDS = env.fromCollection(Classification.trainingData)
 
+    val test = trainingDS.map(x => (x.vector, x.label))
+
     svm.fit(trainingDS)
 
     val threshold = 0.0
 
-    val predictionPairs = svm.predict(trainingDS).map {
+    val predictionPairs = svm.evaluate(test).map {
       truthPrediction =>
         val truth = truthPrediction._1
         val prediction = truthPrediction._2

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala
deleted file mode 100644
index 674c1c4..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/feature/PolynomialFeaturesITSuite.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.feature
-
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.ml.common.LabeledVector
-import org.apache.flink.ml.math.DenseVector
-import org.apache.flink.ml.preprocessing.PolynomialFeatures
-import org.scalatest.{Matchers, FlatSpec}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.test.util.FlinkTestBase
-
-class PolynomialFeaturesITSuite
-  extends FlatSpec
-  with Matchers
-  with FlinkTestBase {
-
-  behavior of "The polynomial base implementation"
-
-  it should "map single element vectors to the polynomial vector space" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism (2)
-
-    val input = Seq (
-    LabeledVector (1.0, DenseVector (1)),
-    LabeledVector (2.0, DenseVector (2))
-    )
-
-    val inputDS = env.fromCollection (input)
-
-    val transformer = PolynomialFeatures()
-    .setDegree (3)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val expectedMap = List (
-    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
-    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
-    ) toMap
-
-    val result = transformedDS.collect()
-
-    for (entry <- result) {
-    expectedMap.contains (entry.label) should be (true)
-    entry.vector should equal (expectedMap (entry.label) )
-    }
-  }
-
-  it should "map vectors to the polynomial vector space" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val input = Seq(
-      LabeledVector(1.0, DenseVector(2, 3)),
-      LabeledVector(2.0, DenseVector(2, 3, 4))
-    )
-
-    val expectedMap = List(
-      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
-      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
-        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
-    ) toMap
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialFeatures()
-      .setDegree(3)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect()
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-  }
-
-  it should "return an empty vector if the max degree is zero" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    env.setParallelism(2)
-
-    val input = Seq(
-      LabeledVector(1.0, DenseVector(2, 3)),
-      LabeledVector(2.0, DenseVector(2, 3, 4))
-    )
-
-    val inputDS = env.fromCollection(input)
-
-    val transformer = PolynomialFeatures()
-      .setDegree(0)
-
-    val transformedDS = transformer.transform(inputDS)
-
-    val result = transformedDS.collect()
-
-    val expectedMap = List(
-      (1.0 -> DenseVector()),
-      (2.0 -> DenseVector())
-    ) toMap
-
-    for(entry <- result) {
-      expectedMap.contains(entry.label) should be(true)
-      entry.vector should equal(expectedMap(entry.label))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
index c25ad79..a3ea086 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/pipeline/PipelineITSuite.scala
@@ -97,17 +97,17 @@ class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
       pipeline.fit(vectorData)
     }
 
-    exceptionFit.getMessage should equal("There is no FitOperation defined for class org.apache." +
+    exceptionFit.getMessage should equal("There is no FitOperation defined for org.apache." +
       "flink.ml.regression.MultipleLinearRegression which trains on a " +
-      "DataSet[class org.apache.flink.ml.math.DenseVector]")
+      "DataSet[org.apache.flink.ml.math.DenseVector]")
 
     // fit the pipeline so that the StandardScaler won't fail when predict is called on the pipeline
     pipeline.fit(labeledData)
 
     // make sure that we have TransformOperation[StandardScaler, Double, Double]
     implicit val standardScalerDoubleTransform =
-      new TransformOperation[StandardScaler, Double, Double] {
-        override def transform(instance: StandardScaler, transformParameters: ParameterMap,
+      new TransformDataSetOperation[StandardScaler, Double, Double] {
+        override def transformDataSet(instance: StandardScaler, transformParameters: ParameterMap,
           input: DataSet[Double]): DataSet[Double] = {
           input
         }
@@ -117,9 +117,9 @@ class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
       pipeline.predict(doubleData)
     }
 
-    exceptionPredict.getMessage should equal("There is no PredictOperation defined for class " +
+    exceptionPredict.getMessage should equal("There is no PredictOperation defined for " +
       "org.apache.flink.ml.regression.MultipleLinearRegression which takes a " +
-      "DataSet[double] as input.")
+      "DataSet[Double] as input.")
   }
 
   it should "throw an exception when the input data is not supported" in {
@@ -137,15 +137,15 @@ class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
       pipeline.fit(doubleData)
     }
 
-    exceptionFit.getMessage should equal("There is no FitOperation defined for class org.apache." +
-      "flink.ml.preprocessing.StandardScaler which trains on a DataSet[double]")
+    exceptionFit.getMessage should equal("There is no FitOperation defined for org.apache." +
+      "flink.ml.preprocessing.StandardScaler which trains on a DataSet[Double]")
 
     val exceptionTransform = intercept[RuntimeException] {
       pipeline.transform(doubleData)
     }
 
-    exceptionTransform.getMessage should equal("There is no TransformOperation defined for class " +
-      "org.apache.flink.ml.preprocessing.StandardScaler which takes a DataSet[double] as input.")
+    exceptionTransform.getMessage should equal("There is no TransformOperation defined for " +
+      "org.apache.flink.ml.preprocessing.StandardScaler which takes a DataSet[Double] as input.")
   }
 
   it should "support multiple transformers and a predictor" in {
@@ -154,8 +154,12 @@ class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
     val data = List(LabeledVector(1.0, DenseVector(1.0, 2.0)),
       LabeledVector(2.0, DenseVector(2.0, 3.0)),
       LabeledVector(3.0, DenseVector(3.0, 4.0)))
+    val testing = data.map(_.vector)
+    val evaluation = data.map(x => (x.vector, x.label))
 
     val trainingData = env.fromCollection(data)
+    val testingData = env.fromCollection(testing)
+    val evaluationData = env.fromCollection(evaluation)
 
     val chainedScalers2 = StandardScaler().chainTransformer(StandardScaler())
     val chainedScalers3 = chainedScalers2.chainTransformer(StandardScaler())
@@ -175,6 +179,17 @@ class PipelineITSuite extends FlatSpec with Matchers with FlinkTestBase {
     }
 
     weightVector.intercept should be (0.807924 +- 0.01)
+
+    val predictionDS = pipeline.predict(testingData)
+
+    val predictionResult = predictionDS.collect()
+
+    val evaluationDS = pipeline.evaluate(evaluationData)
+
+    val evaluationResult = evaluationDS.collect()
+
+    predictionResult.size should be(testing.size)
+    evaluationResult.size should be(evaluation.size)
   }
 
   it should "throw an exception when the input data is not supported by a predictor" in {

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
new file mode 100644
index 0000000..006db5f
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.preprocessing
+
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+class PolynomialFeaturesITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "The polynomial base implementation"
+
+  it should "map single element vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism (2)
+
+    val input = Seq (
+    LabeledVector (1.0, DenseVector (1)),
+    LabeledVector (2.0, DenseVector (2))
+    )
+
+    val inputDS = env.fromCollection (input)
+
+    val transformer = PolynomialFeatures()
+    .setDegree (3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val expectedMap = List (
+    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
+    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
+    ) toMap
+
+    val result = transformedDS.collect()
+
+    for (entry <- result) {
+    expectedMap.contains (entry.label) should be (true)
+    entry.vector should equal (expectedMap (entry.label) )
+    }
+  }
+
+  it should "map vectors to the polynomial vector space" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(1.0, DenseVector(2, 3)),
+      LabeledVector(2.0, DenseVector(2, 3, 4))
+    )
+
+    val expectedMap = List(
+      (1.0 -> DenseVector(8.0, 12.0, 18.0, 27.0, 4.0, 6.0, 9.0, 2.0, 3.0)),
+      (2.0 -> DenseVector(8.0, 12.0, 16.0, 18.0, 24.0, 32.0, 27.0, 36.0, 48.0, 64.0, 4.0, 6.0, 8.0,
+        9.0, 12.0, 16.0, 2.0, 3.0, 4.0))
+    ) toMap
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialFeatures()
+      .setDegree(3)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect()
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+
+  it should "return an empty vector if the max degree is zero" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val input = Seq(
+      LabeledVector(1.0, DenseVector(2, 3)),
+      LabeledVector(2.0, DenseVector(2, 3, 4))
+    )
+
+    val inputDS = env.fromCollection(input)
+
+    val transformer = PolynomialFeatures()
+      .setDegree(0)
+
+    val transformedDS = transformer.transform(inputDS)
+
+    val result = transformedDS.collect()
+
+    val expectedMap = List(
+      (1.0 -> DenseVector()),
+      (2.0 -> DenseVector())
+    ) toMap
+
+    for(entry <- result) {
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7a7a2940/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index e42b87d..4e78ba5 100644
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -121,9 +121,11 @@ class MultipleLinearRegressionITSuite
     parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
 
     val inputDS = env.fromCollection(data)
+    val evaluationDS = inputDS.map(x => (x.vector, x.label))
+
     mlr.fit(inputDS, parameters)
 
-    val predictionPairs = mlr.predict(inputDS)
+    val predictionPairs = mlr.evaluate(evaluationDS)
 
     val absoluteErrorSum = predictionPairs.collect().map{
       case (truth, prediction) => Math.abs(truth - prediction)}.sum


[2/2] flink git commit: [FLINK-2214] [ml] Fixes prediction join operation and empirical risk join operation of ALS by giving join hint

Posted by tr...@apache.org.
[FLINK-2214] [ml] Fixes prediction join operation and empirical risk join operation of ALS by giving join hint

This closes #844.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/38ee125e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/38ee125e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/38ee125e

Branch: refs/heads/master
Commit: 38ee125e49544c329d3723e12bbf21c046af8964
Parents: 6ab0627
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 16 18:53:12 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jun 26 12:09:35 2015 +0200

----------------------------------------------------------------------
 .../scala/org/apache/flink/ml/recommendation/ALS.scala    | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/38ee125e/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
index c5db6e4..04174d3 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/recommendation/ALS.scala
@@ -20,6 +20,7 @@ package org.apache.flink.ml.recommendation
 
 import java.{util, lang}
 
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.operators.Order
 import org.apache.flink.core.memory.{DataOutputView, DataInputView}
@@ -206,8 +207,9 @@ class ALS extends Predictor[ALS] {
 
     factorsOption match {
       case Some((userFactors, itemFactors)) => {
-        val predictions = data.join(userFactors).where(0).equalTo(0)
-          .join(itemFactors).where("_1._2").equalTo(0).map {
+        val predictions = data.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0)
+          .equalTo(0).join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2")
+          .equalTo(0).map {
           triple => {
             val (((uID, iID), uFactors), iFactors) = triple
 
@@ -397,8 +399,8 @@ object ALS {
 
       instance.factorsOption match {
         case Some((userFactors, itemFactors)) => {
-          input.join(userFactors).where(0).equalTo(0)
-            .join(itemFactors).where("_1._2").equalTo(0).map {
+          input.join(userFactors, JoinHint.REPARTITION_HASH_SECOND).where(0).equalTo(0)
+            .join(itemFactors, JoinHint.REPARTITION_HASH_SECOND).where("_1._2").equalTo(0).map {
             triple => {
               val (((uID, iID), uFactors), iFactors) = triple