You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/06/09 10:29:52 UTC

[1/2] flink git commit: [FLINK-1844] [ml] Add MinMaxScaler implementation in the proprocessing package, test for the for the corresponding functionality and documentation.

Repository: flink
Updated Branches:
  refs/heads/master e0d60ddbd -> 97611c245


[FLINK-1844] [ml] Add MinMaxScaler implementation in the proprocessing package, test for the for the corresponding functionality and documentation.

This closes #798.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73f99117
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73f99117
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73f99117

Branch: refs/heads/master
Commit: 73f99117f783a1f5c3ae67ade4716512b3ddfd66
Parents: e0d60dd
Author: fobeligi <fa...@gmail.com>
Authored: Fri Jun 5 23:12:43 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 9 10:25:36 2015 +0200

----------------------------------------------------------------------
 docs/libs/ml/index.md                           |   1 +
 docs/libs/ml/min_max_scaler.md                  | 112 ++++++++
 .../flink/ml/preprocessing/MinMaxScaler.scala   | 264 +++++++++++++++++++
 .../ml/preprocessing/MinMaxScalerITSuite.scala  | 243 +++++++++++++++++
 4 files changed, 620 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index f61480f..de9137d 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -43,6 +43,7 @@ FlinkML currently supports the following algorithms:
 
 * [Polynomial Features](polynomial_features.html)
 * [Standard Scaler](standard_scaler.html)
+* [MinMax Scaler](min_max_scaler.html)
 
 ### Recommendation
 

http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/docs/libs/ml/min_max_scaler.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/min_max_scaler.md b/docs/libs/ml/min_max_scaler.md
new file mode 100644
index 0000000..0c00dcd
--- /dev/null
+++ b/docs/libs/ml/min_max_scaler.md
@@ -0,0 +1,112 @@
+---
+mathjax: include
+htmlTitle: FlinkML - MinMax Scaler
+title: <a href="../ml">FlinkML</a> - MinMax Scaler
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The MinMax scaler scales the given data set, so that all values will lie between a user specified range [min,max].
+ In case the user does not provide a specific minimum and maximum value for the scaling range, the MinMax scaler transforms the features of the input data set to lie in the [0,1] interval.
+ Given a set of input data $x_1, x_2,... x_n$, with minimum value:
+
+ $$x_{min} = min({x_1, x_2,..., x_n})$$
+
+ and maximum value:
+
+ $$x_{max} = max({x_1, x_2,..., x_n})$$
+
+The scaled data set $z_1, z_2,...,z_n$ will be:
+
+ $$z_{i}= \frac{x_{i} - x_{min}}{x_{max} - x_{min}} \left ( max - min \right ) + min$$
+
+where $\textit{min}$ and $\textit{max}$ are the user specified minimum and maximum values of the range to scale.
+
+## Operations
+
+`MinMaxScaler` is a `Transformer`.
+As such, it supports the `fit` and `transform` operation.
+
+### Fit
+
+MinMaxScaler is trained on all subtypes of `Vector` or `LabeledVector`:
+
+* `fit[T <: Vector]: DataSet[T] => Unit`
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Transform
+
+MinMaxScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type:
+
+* `transform[T <: Vector]: DataSet[T] => DataSet[T]`
+* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]`
+
+## Parameters
+
+The MinMax scaler implementation can be controlled by the following two parameters:
+
+ <table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Parameters</th>
+      <th class="text-center">Description</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>Min</strong></td>
+      <td>
+        <p>
+          The minimum value of the range for the scaled data set. (Default value: <strong>0.0</strong>)
+        </p>
+      </td>
+    </tr>
+    <tr>
+      <td><strong>Max</strong></td>
+      <td>
+        <p>
+          The maximum value of the range for the scaled data set. (Default value: <strong>1.0</strong>)
+        </p>
+      </td>
+    </tr>
+  </tbody>
+</table>
+
+## Examples
+
+{% highlight scala %}
+// Create MinMax scaler transformer
+val minMaxscaler = MinMaxScaler()
+  .setMin(-1.0)
+
+// Obtain data set to be scaled
+val dataSet: DataSet[Vector] = ...
+
+// Learn the minimum and maximum values of the training data
+minMaxscaler.fit(dataSet)
+
+// Scale the provided data set to have min=-1.0 and max=1.0
+val scaledDS = minMaxscaler.transform(dataSet)
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
new file mode 100644
index 0000000..bded9c6
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.linalg.{max, min}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer}
+import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min}
+
+import scala.reflect.ClassTag
+
+/** Scales observations, so that all features are in a user-specified range.
+  * By default for [[MinMaxScaler]] transformer range = [0,1].
+  *
+  * This transformer takes a subtype of  [[Vector]] of values and maps it to a
+  * scaled subtype of [[Vector]] such that each feature lies between a user-specified range.
+  *
+  * This transformer can be prepended to all [[Transformer]] and
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype
+  * of [[Vector]] or a [[LabeledVector]].
+  *
+  * @example
+  * {{{
+  *               val trainingDS: DataSet[Vector] = env.fromCollection(data)
+  *               val transformer = MinMaxScaler().setMin(-1.0)
+  *
+  *               transformer.fit(trainingDS)
+  *               val transformedDS = transformer.transform(trainingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[Min]]: The minimum value of the range of the transformed data set; by default equal to 0
+  * - [[Max]]: The maximum value of the range of the transformed data set; by default
+  * equal to 1
+  */
+class MinMaxScaler extends Transformer[MinMaxScaler] {
+
+  private [preprocessing] var metricsOption: Option[
+      DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
+    ] = None
+
+  /** Sets the minimum for the range of the transformed data
+    *
+    * @param min the user-specified minimum value.
+    * @return the MinMaxScaler instance with its minimum value set to the user-specified value.
+    */
+  def setMin(min: Double): MinMaxScaler = {
+    parameters.add(Min, min)
+    this
+  }
+
+  /** Sets the maximum for the range of the transformed data
+    *
+    * @param max the user-specified maximum value.
+    * @return the MinMaxScaler instance with its minimum value set to the user-specified value.
+    */
+  def setMax(max: Double): MinMaxScaler = {
+    parameters.add(Max, max)
+    this
+  }
+}
+
+object MinMaxScaler {
+
+  // ====================================== Parameters =============================================
+
+  case object Min extends Parameter[Double] {
+    override val defaultValue: Option[Double] = Some(0.0)
+  }
+
+  case object Max extends Parameter[Double] {
+    override val defaultValue: Option[Double] = Some(1.0)
+  }
+
+  // ==================================== Factory methods ==========================================
+
+  def apply(): MinMaxScaler = {
+    new MinMaxScaler()
+  }
+
+  // ====================================== Operations =============================================
+
+  /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of each feature of the
+    * training data. These values are used in the transform step to transform the given input data.
+    *
+    * @tparam T Input data type which is a subtype of [[Vector]]
+    * @return [[FitOperation]] training the [[MinMaxScaler]] on subtypes of [[Vector]]
+    */
+  implicit def fitVectorMinMaxScaler[T <: Vector] = new FitOperation[MinMaxScaler, T] {
+    override def fit(instance: MinMaxScaler, fitParameters: ParameterMap, input: DataSet[T])
+    : Unit = {
+      val metrics = extractFeatureMinMaxVectors(input)
+
+      instance.metricsOption = Some(metrics)
+    }
+  }
+
+  /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of the features of the
+    * training data which is of type [[LabeledVector]]. The minimum and maximum are used to
+    * transform the given input data.
+    *
+    */
+  implicit val fitLabeledVectorMinMaxScaler = {
+    new FitOperation[MinMaxScaler, LabeledVector] {
+      override def fit(
+        instance: MinMaxScaler,
+        fitParameters: ParameterMap,
+        input: DataSet[LabeledVector])
+      : Unit = {
+        val vectorDS = input.map(_.vector)
+        val metrics = extractFeatureMinMaxVectors(vectorDS)
+
+        instance.metricsOption = Some(metrics)
+      }
+    }
+  }
+
+  /** Calculates in one pass over the data the features' minimum and maximum values.
+    *
+    * @param dataSet The data set for which we want to calculate the minimum and maximum values.
+    * @return  DataSet containing a single tuple of two vectors (minVector, maxVector).
+    *          The first vector represents the minimum values vector and the second is the maximum
+    *          values vector.
+    */
+  private def extractFeatureMinMaxVectors[T <: Vector](dataSet: DataSet[T])
+  : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
+
+    val minMax = dataSet.map {
+      v => (v.asBreeze, v.asBreeze)
+    }.reduce {
+      (minMax1, minMax2) => {
+
+        val tempMinimum = min(minMax1._1, minMax2._1)
+        val tempMaximum = max(minMax1._2, minMax2._2)
+
+        (tempMinimum, tempMaximum)
+      }
+    }
+    minMax
+  }
+
+  /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to
+    * the calculated minimum and maximum of the training data. The minimum and maximum
+    * values of the resulting data is configurable.
+    *
+    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
+    * @return [[TransformOperation]] scaling subtypes of [[Vector]] such that the feature values are
+    *        in the configured range
+    */
+  implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
+  = {
+    new TransformOperation[MinMaxScaler, T, T] {
+      override def transform(
+        instance: MinMaxScaler,
+        transformParameters: ParameterMap,
+        input: DataSet[T])
+      : DataSet[T] = {
+
+        val resultingParameters = instance.parameters ++ transformParameters
+        val min = resultingParameters(Min)
+        val max = resultingParameters(Max)
+
+        instance.metricsOption match {
+          case Some(metrics) => {
+            input.mapWithBcVariable(metrics) {
+              (vector, metrics) => {
+                val (broadcastMin, broadcastMax) = metrics
+                scaleVector(vector, broadcastMin, broadcastMax, min, max)
+              }
+            }
+          }
+
+          case None =>
+            throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " +
+              "This is necessary to estimate the minimum and maximum of the data.")
+        }
+      }
+    }
+  }
+
+  implicit val transformLabeledVectors = {
+    new TransformOperation[MinMaxScaler, LabeledVector, LabeledVector] {
+      override def transform(instance: MinMaxScaler,
+        transformParameters: ParameterMap,
+        input: DataSet[LabeledVector]): DataSet[LabeledVector] = {
+        val resultingParameters = instance.parameters ++ transformParameters
+        val min = resultingParameters(Min)
+        val max = resultingParameters(Max)
+
+        instance.metricsOption match {
+          case Some(metrics) => {
+            input.mapWithBcVariable(metrics) {
+              (labeledVector, metrics) => {
+                val (broadcastMin, broadcastMax) = metrics
+                val LabeledVector(label, vector) = labeledVector
+
+                LabeledVector(label, scaleVector(vector, broadcastMin, broadcastMax, min, max))
+              }
+            }
+          }
+
+          case None =>
+            throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " +
+              "This is necessary to estimate the minimum and maximum of the data.")
+        }
+      }
+    }
+  }
+
+  /** Scales a vector such that it's features lie in the range [min, max]
+    *
+    * @param vector Vector to scale
+    * @param broadcastMin Vector containing for each feature the minimal value in the training set
+    * @param broadcastMax Vector containing for each feature the maximal value in the training set
+    * @param min Minimal value of range
+    * @param max Maximal value of range
+    * @tparam T Type of [[Vector]]
+    * @return Scaled feature vector
+    */
+  private def scaleVector[T <: Vector: BreezeVectorConverter](
+      vector: T,
+      broadcastMin: linalg.Vector[Double],
+      broadcastMax: linalg.Vector[Double],
+      min: Double,
+      max: Double)
+    : T = {
+    var myVector = vector.asBreeze
+
+    //handle the case where a feature takes only one value
+    val rangePerFeature = (broadcastMax - broadcastMin)
+    for (i <- 0 until rangePerFeature.size) {
+      if (rangePerFeature(i) == 0.0) {
+        rangePerFeature(i)= 1.0
+      }
+    }
+
+    myVector -= broadcastMin
+    myVector :/= rangePerFeature
+    myVector = (myVector :* (max - min)) + min
+    myVector.fromBreeze
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
new file mode 100644
index 0000000..75ac442
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg.{max, min}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{DenseVector, Vector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class MinMaxScalerITSuite
+  extends FlatSpec
+  with Matchers
+  with FlinkTestBase {
+
+  behavior of "Flink's MinMax Scaler"
+
+  import MinMaxScalerData._
+
+  it should "scale the vectors' values to be restricted in the [0.0,1.0] range" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data)
+    val minMaxScaler = MinMaxScaler()
+    minMaxScaler.fit(dataSet)
+    val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+    scaledVectors.length should equal(data.length)
+
+    //ensure data lies in the user-specified range
+    for (vector <- scaledVectors) {
+      val test = vector.asBreeze.forall(fv => {
+        fv >= 0.0 && fv <= 1.0
+      })
+      test shouldEqual true
+    }
+
+    var expectedMin = data.head.asBreeze
+    var expectedMax = data.head.asBreeze
+
+    for (v <- data.tail) {
+      val tempVector = v.asBreeze
+      expectedMin = min(expectedMin, tempVector)
+      expectedMax = max(expectedMax, tempVector)
+    }
+
+    //ensure that estimated Min and Max vectors equal the expected ones
+    val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+    estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+    //handle the case where a feature takes only one value
+    val expectedRangePerFeature = (expectedMax - expectedMin)
+    for (i <- 0 until expectedRangePerFeature.size) {
+      if (expectedRangePerFeature(i) == 0.0) {
+        expectedRangePerFeature(i)= 1.0
+      }
+    }
+
+    //ensure that vectors where scaled correctly
+    for (i <- 0 until data.length) {
+      var expectedVector = data(i).asBreeze - expectedMin
+      expectedVector :/= expectedRangePerFeature
+      expectedVector = expectedVector :* (1.0 - 0.0)
+
+      expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i)
+    }
+  }
+
+  it should "scale vectors' values in the [-1.0,1.0] range" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(labeledData)
+    val minMaxScaler = MinMaxScaler().setMin(-1.0)
+    minMaxScaler.fit(dataSet)
+    val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+    scaledVectors.length should equal(labeledData.length)
+
+    //ensure data lies in the user-specified range
+    for (labeledVector <- scaledVectors) {
+      val test = labeledVector.vector.asBreeze.forall(lv => {
+        lv >= -1.0 && lv <= 1.0
+      })
+      test shouldEqual true
+    }
+
+    var expectedMin = labeledData.head.vector.asBreeze
+    var expectedMax = labeledData.head.vector.asBreeze
+
+    for (v <- labeledData.tail) {
+      val tempVector = v.vector.asBreeze
+      expectedMin = min(expectedMin, tempVector)
+      expectedMax = max(expectedMax, tempVector)
+    }
+
+    //ensure that estimated Min and Max vectors equal the expected ones
+    val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+    estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+    //handle the case where a feature takes only one value
+    val expectedRangePerFeature = (expectedMax - expectedMin)
+    for (i <- 0 until expectedRangePerFeature.size) {
+      if (expectedRangePerFeature(i) == 0.0) {
+        expectedRangePerFeature(i)= 1.0
+      }
+    }
+
+    //ensure that LabeledVectors where scaled correctly
+    for (i <- 0 until labeledData.length) {
+      var expectedVector = labeledData(i).vector.asBreeze - expectedMin
+      expectedVector :/= expectedRangePerFeature
+      expectedVector = (expectedVector :* (1.0 + 1.0)) - 1.0
+
+      labeledData(i).label shouldEqual scaledVectors(i).label
+      expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i)
+        .vector
+    }
+  }
+}
+
+
+object MinMaxScalerData {
+
+  val data: Seq[Vector] = List(
+    DenseVector(Array(2104.00, 3.00, 0.0)),
+    DenseVector(Array(1600.00, 3.00, 0.0)),
+    DenseVector(Array(2400.00, 3.00, 0.0)),
+    DenseVector(Array(1416.00, 2.00, 0.0)),
+    DenseVector(Array(3000.00, 4.00, 0.0)),
+    DenseVector(Array(1985.00, 4.00, 0.0)),
+    DenseVector(Array(1534.00, 3.00, 0.0)),
+    DenseVector(Array(1427.00, 3.00, 0.0)),
+    DenseVector(Array(1380.00, 3.00, 0.0)),
+    DenseVector(Array(1494.00, 3.00, 0.0)),
+    DenseVector(Array(1940.00, 4.00, 0.0)),
+    DenseVector(Array(2000.00, 3.00, 0.0)),
+    DenseVector(Array(1890.00, 3.00, 0.0)),
+    DenseVector(Array(4478.00, 5.00, 0.0)),
+    DenseVector(Array(1268.00, 3.00, 0.0)),
+    DenseVector(Array(2300.00, 4.00, 0.0)),
+    DenseVector(Array(1320.00, 2.00, 0.0)),
+    DenseVector(Array(1236.00, 3.00, 0.0)),
+    DenseVector(Array(2609.00, 4.00, 0.0)),
+    DenseVector(Array(3031.00, 4.00, 0.0)),
+    DenseVector(Array(1767.00, 3.00, 0.0)),
+    DenseVector(Array(1888.00, 2.00, 0.0)),
+    DenseVector(Array(1604.00, 3.00, 0.0)),
+    DenseVector(Array(1962.00, 4.00, 0.0)),
+    DenseVector(Array(3890.00, 3.00, 0.0)),
+    DenseVector(Array(1100.00, 3.00, 0.0)),
+    DenseVector(Array(1458.00, 3.00, 0.0)),
+    DenseVector(Array(2526.00, 3.00, 0.0)),
+    DenseVector(Array(2200.00, 3.00, 0.0)),
+    DenseVector(Array(2637.00, 3.00, 0.0)),
+    DenseVector(Array(1839.00, 2.00, 0.0)),
+    DenseVector(Array(1000.00, 1.00, 0.0)),
+    DenseVector(Array(2040.00, 4.00, 0.0)),
+    DenseVector(Array(3137.00, 3.00, 0.0)),
+    DenseVector(Array(1811.00, 4.00, 0.0)),
+    DenseVector(Array(1437.00, 3.00, 0.0)),
+    DenseVector(Array(1239.00, 3.00, 0.0)),
+    DenseVector(Array(2132.00, 4.00, 0.0)),
+    DenseVector(Array(4215.00, 4.00, 0.0)),
+    DenseVector(Array(2162.00, 4.00, 0.0)),
+    DenseVector(Array(1664.00, 2.00, 0.0)),
+    DenseVector(Array(2238.00, 3.00, 0.0)),
+    DenseVector(Array(2567.00, 4.00, 0.0)),
+    DenseVector(Array(1200.00, 3.00, 0.0)),
+    DenseVector(Array(852.00, 2.00, 0.0)),
+    DenseVector(Array(1852.00, 4.00, 0.0)),
+    DenseVector(Array(1203.00, 3.00, 0.0))
+  )
+
+  val labeledData: Seq[LabeledVector] = List(
+    LabeledVector(1.0, DenseVector(Array(2104.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1600.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2400.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1416.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3000.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1985.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1534.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1427.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1380.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1494.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1940.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2000.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1890.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(4478.00, 5.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1268.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2300.00, 4.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1320.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1236.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2609.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3031.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1767.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1888.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1604.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1962.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3890.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1100.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1458.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2526.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2200.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2637.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1839.00, 2.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1000.00, 1.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2040.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(3137.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1811.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1437.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1239.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2132.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(4215.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2162.00, 4.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(1664.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2238.00, 3.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(2567.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1200.00, 3.00, 0.0))),
+    LabeledVector(0.0, DenseVector(Array(852.00, 2.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1852.00, 4.00, 0.0))),
+    LabeledVector(1.0, DenseVector(Array(1203.00, 3.00, 0.0)))
+  )
+}


[2/2] flink git commit: [ml] Makes StandardScalers state package private and reduce redundant code. Adjusts flink-ml readme.

Posted by tr...@apache.org.
[ml] Makes StandardScalers state package private and reduce redundant code. Adjusts flink-ml readme.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97611c24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97611c24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97611c24

Branch: refs/heads/master
Commit: 97611c245f4df5820124fba25e55a2bac59086b4
Parents: 73f9911
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 9 10:23:09 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 9 10:29:24 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-ml/README.md                |  5 +--
 .../flink/ml/preprocessing/StandardScaler.scala | 44 +++++++++++++-------
 2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97611c24/flink-staging/flink-ml/README.md
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/README.md b/flink-staging/flink-ml/README.md
index 5721e8f..5cabd7c 100644
--- a/flink-staging/flink-ml/README.md
+++ b/flink-staging/flink-ml/README.md
@@ -7,10 +7,9 @@ Theses implementations allow to scale to data sizes which vastly exceed the memo
 Flink-ML currently comprises the following algorithms:
 
 * Classification
+** Soft-margin SVM
 * Regression
-** Logistic regression
-* Clustering
-** k-Means
+** Multiple linear regression
 * Recommendation
 ** Alternating least squares (ALS)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97611c24/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 3b9c8d2..bf09b20 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -21,10 +21,8 @@ package org.apache.flink.ml.preprocessing
 import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
-import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml._
 import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
@@ -62,7 +60,9 @@ import scala.reflect.ClassTag
   */
 class StandardScaler extends Transformer[StandardScaler] {
 
-  var metricsOption: Option[DataSet[(linalg.Vector[Double], linalg.Vector[Double])]] = None
+  private[preprocessing] var metricsOption: Option[
+      DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
+    ] = None
 
   /** Sets the target mean of the transformed data
     *
@@ -213,12 +213,7 @@ object StandardScaler {
             input.mapWithBcVariable(metrics){
               (vector, metrics) => {
                 val (broadcastMean, broadcastStd) = metrics
-                var myVector = vector.asBreeze
-
-                myVector -= broadcastMean
-                myVector :/= broadcastStd
-                myVector = (myVector :* std) + mean
-                myVector.fromBreeze
+                scaleVector(vector, broadcastMean, broadcastStd, mean, std)
               }
             }
           }
@@ -245,12 +240,8 @@ object StandardScaler {
               (labeledVector, metrics) => {
                 val (broadcastMean, broadcastStd) = metrics
                 val LabeledVector(label, vector) = labeledVector
-                var breezeVector = vector.asBreeze
 
-                breezeVector -= broadcastMean
-                breezeVector :/= broadcastStd
-                breezeVector = (breezeVector :* std) + mean
-                LabeledVector(label, breezeVector.fromBreeze)
+                LabeledVector(label, scaleVector(vector, broadcastMean, broadcastStd, mean, std))
               }
             }
           }
@@ -262,4 +253,29 @@ object StandardScaler {
       }
     }
   }
+
+  /** Scales the given vector such that it has the given mean and std
+    *
+    * @param vector Vector to be scaled
+    * @param dataMean Mean of the training data
+    * @param dataStd Standard deviation of the training data
+    * @param mean Mean of the scaled data
+    * @param std Standard deviation of the scaled data
+    * @tparam T Type of [[Vector]]
+    * @return Scaled vector
+    */
+  private def scaleVector[T <: Vector: BreezeVectorConverter](
+      vector: T,
+      dataMean: linalg.Vector[Double],
+      dataStd: linalg.Vector[Double],
+      mean: Double,
+      std: Double)
+    : T = {
+    var myVector = vector.asBreeze
+
+    myVector -= dataMean
+    myVector :/= dataStd
+    myVector = (myVector :* std) + mean
+    myVector.fromBreeze
+  }
 }