You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/06/09 10:29:52 UTC
[1/2] flink git commit: [FLINK-1844] [ml] Add MinMaxScaler
implementation in the proprocessing package,
test for the for the corresponding functionality and documentation.
Repository: flink
Updated Branches:
refs/heads/master e0d60ddbd -> 97611c245
[FLINK-1844] [ml] Add MinMaxScaler implementation in the proprocessing package, test for the for the corresponding functionality and documentation.
This closes #798.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/73f99117
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/73f99117
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/73f99117
Branch: refs/heads/master
Commit: 73f99117f783a1f5c3ae67ade4716512b3ddfd66
Parents: e0d60dd
Author: fobeligi <fa...@gmail.com>
Authored: Fri Jun 5 23:12:43 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 9 10:25:36 2015 +0200
----------------------------------------------------------------------
docs/libs/ml/index.md | 1 +
docs/libs/ml/min_max_scaler.md | 112 ++++++++
.../flink/ml/preprocessing/MinMaxScaler.scala | 264 +++++++++++++++++++
.../ml/preprocessing/MinMaxScalerITSuite.scala | 243 +++++++++++++++++
4 files changed, 620 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index f61480f..de9137d 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -43,6 +43,7 @@ FlinkML currently supports the following algorithms:
* [Polynomial Features](polynomial_features.html)
* [Standard Scaler](standard_scaler.html)
+* [MinMax Scaler](min_max_scaler.html)
### Recommendation
http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/docs/libs/ml/min_max_scaler.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/min_max_scaler.md b/docs/libs/ml/min_max_scaler.md
new file mode 100644
index 0000000..0c00dcd
--- /dev/null
+++ b/docs/libs/ml/min_max_scaler.md
@@ -0,0 +1,112 @@
+---
+mathjax: include
+htmlTitle: FlinkML - MinMax Scaler
+title: <a href="../ml">FlinkML</a> - MinMax Scaler
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The MinMax scaler scales the given data set, so that all values will lie between a user specified range [min,max].
+ In case the user does not provide a specific minimum and maximum value for the scaling range, the MinMax scaler transforms the features of the input data set to lie in the [0,1] interval.
+ Given a set of input data $x_1, x_2,... x_n$, with minimum value:
+
+ $$x_{min} = min({x_1, x_2,..., x_n})$$
+
+ and maximum value:
+
+ $$x_{max} = max({x_1, x_2,..., x_n})$$
+
+The scaled data set $z_1, z_2,...,z_n$ will be:
+
+ $$z_{i}= \frac{x_{i} - x_{min}}{x_{max} - x_{min}} \left ( max - min \right ) + min$$
+
+where $\textit{min}$ and $\textit{max}$ are the user specified minimum and maximum values of the range to scale.
+
+## Operations
+
+`MinMaxScaler` is a `Transformer`.
+As such, it supports the `fit` and `transform` operation.
+
+### Fit
+
+MinMaxScaler is trained on all subtypes of `Vector` or `LabeledVector`:
+
+* `fit[T <: Vector]: DataSet[T] => Unit`
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Transform
+
+MinMaxScaler transforms all subtypes of `Vector` or `LabeledVector` into the respective type:
+
+* `transform[T <: Vector]: DataSet[T] => DataSet[T]`
+* `transform: DataSet[LabeledVector] => DataSet[LabeledVector]`
+
+## Parameters
+
+The MinMax scaler implementation can be controlled by the following two parameters:
+
+ <table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Parameters</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Min</strong></td>
+ <td>
+ <p>
+ The minimum value of the range for the scaled data set. (Default value: <strong>0.0</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Max</strong></td>
+ <td>
+ <p>
+ The maximum value of the range for the scaled data set. (Default value: <strong>1.0</strong>)
+ </p>
+ </td>
+ </tr>
+ </tbody>
+</table>
+
+## Examples
+
+{% highlight scala %}
+// Create MinMax scaler transformer
+val minMaxscaler = MinMaxScaler()
+ .setMin(-1.0)
+
+// Obtain data set to be scaled
+val dataSet: DataSet[Vector] = ...
+
+// Learn the minimum and maximum values of the training data
+minMaxscaler.fit(dataSet)
+
+// Scale the provided data set to have min=-1.0 and max=1.0
+val scaledDS = minMaxscaler.transform(dataSet)
+{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
new file mode 100644
index 0000000..bded9c6
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg
+import breeze.linalg.{max, min}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.ml._
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{FitOperation, TransformOperation, Transformer}
+import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min}
+
+import scala.reflect.ClassTag
+
+/** Scales observations, so that all features are in a user-specified range.
+ * By default for [[MinMaxScaler]] transformer range = [0,1].
+ *
+ * This transformer takes a subtype of [[Vector]] of values and maps it to a
+ * scaled subtype of [[Vector]] such that each feature lies between a user-specified range.
+ *
+ * This transformer can be prepended to all [[Transformer]] and
+ * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype
+ * of [[Vector]] or a [[LabeledVector]].
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[Vector] = env.fromCollection(data)
+ * val transformer = MinMaxScaler().setMin(-1.0)
+ *
+ * transformer.fit(trainingDS)
+ * val transformedDS = transformer.transform(trainingDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[Min]]: The minimum value of the range of the transformed data set; by default equal to 0
+ * - [[Max]]: The maximum value of the range of the transformed data set; by default
+ * equal to 1
+ */
+class MinMaxScaler extends Transformer[MinMaxScaler] {
+
+ private [preprocessing] var metricsOption: Option[
+ DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
+ ] = None
+
+ /** Sets the minimum for the range of the transformed data
+ *
+ * @param min the user-specified minimum value.
+ * @return the MinMaxScaler instance with its minimum value set to the user-specified value.
+ */
+ def setMin(min: Double): MinMaxScaler = {
+ parameters.add(Min, min)
+ this
+ }
+
+ /** Sets the maximum for the range of the transformed data
+ *
+ * @param max the user-specified maximum value.
+ * @return the MinMaxScaler instance with its minimum value set to the user-specified value.
+ */
+ def setMax(max: Double): MinMaxScaler = {
+ parameters.add(Max, max)
+ this
+ }
+}
+
+object MinMaxScaler {
+
+ // ====================================== Parameters =============================================
+
+ case object Min extends Parameter[Double] {
+ override val defaultValue: Option[Double] = Some(0.0)
+ }
+
+ case object Max extends Parameter[Double] {
+ override val defaultValue: Option[Double] = Some(1.0)
+ }
+
+ // ==================================== Factory methods ==========================================
+
+ def apply(): MinMaxScaler = {
+ new MinMaxScaler()
+ }
+
+ // ====================================== Operations =============================================
+
+ /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of each feature of the
+ * training data. These values are used in the transform step to transform the given input data.
+ *
+ * @tparam T Input data type which is a subtype of [[Vector]]
+ * @return [[FitOperation]] training the [[MinMaxScaler]] on subtypes of [[Vector]]
+ */
+ implicit def fitVectorMinMaxScaler[T <: Vector] = new FitOperation[MinMaxScaler, T] {
+ override def fit(instance: MinMaxScaler, fitParameters: ParameterMap, input: DataSet[T])
+ : Unit = {
+ val metrics = extractFeatureMinMaxVectors(input)
+
+ instance.metricsOption = Some(metrics)
+ }
+ }
+
+ /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of the features of the
+ * training data which is of type [[LabeledVector]]. The minimum and maximum are used to
+ * transform the given input data.
+ *
+ */
+ implicit val fitLabeledVectorMinMaxScaler = {
+ new FitOperation[MinMaxScaler, LabeledVector] {
+ override def fit(
+ instance: MinMaxScaler,
+ fitParameters: ParameterMap,
+ input: DataSet[LabeledVector])
+ : Unit = {
+ val vectorDS = input.map(_.vector)
+ val metrics = extractFeatureMinMaxVectors(vectorDS)
+
+ instance.metricsOption = Some(metrics)
+ }
+ }
+ }
+
+ /** Calculates in one pass over the data the features' minimum and maximum values.
+ *
+ * @param dataSet The data set for which we want to calculate the minimum and maximum values.
+ * @return DataSet containing a single tuple of two vectors (minVector, maxVector).
+ * The first vector represents the minimum values vector and the second is the maximum
+ * values vector.
+ */
+ private def extractFeatureMinMaxVectors[T <: Vector](dataSet: DataSet[T])
+ : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
+
+ val minMax = dataSet.map {
+ v => (v.asBreeze, v.asBreeze)
+ }.reduce {
+ (minMax1, minMax2) => {
+
+ val tempMinimum = min(minMax1._1, minMax2._1)
+ val tempMaximum = max(minMax1._2, minMax2._2)
+
+ (tempMinimum, tempMaximum)
+ }
+ }
+ minMax
+ }
+
+ /** [[TransformOperation]] which scales input data of subtype of [[Vector]] with respect to
+ * the calculated minimum and maximum of the training data. The minimum and maximum
+ * values of the resulting data is configurable.
+ *
+ * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
+ * @return [[TransformOperation]] scaling subtypes of [[Vector]] such that the feature values are
+ * in the configured range
+ */
+ implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
+ = {
+ new TransformOperation[MinMaxScaler, T, T] {
+ override def transform(
+ instance: MinMaxScaler,
+ transformParameters: ParameterMap,
+ input: DataSet[T])
+ : DataSet[T] = {
+
+ val resultingParameters = instance.parameters ++ transformParameters
+ val min = resultingParameters(Min)
+ val max = resultingParameters(Max)
+
+ instance.metricsOption match {
+ case Some(metrics) => {
+ input.mapWithBcVariable(metrics) {
+ (vector, metrics) => {
+ val (broadcastMin, broadcastMax) = metrics
+ scaleVector(vector, broadcastMin, broadcastMax, min, max)
+ }
+ }
+ }
+
+ case None =>
+ throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " +
+ "This is necessary to estimate the minimum and maximum of the data.")
+ }
+ }
+ }
+ }
+
+ implicit val transformLabeledVectors = {
+ new TransformOperation[MinMaxScaler, LabeledVector, LabeledVector] {
+ override def transform(instance: MinMaxScaler,
+ transformParameters: ParameterMap,
+ input: DataSet[LabeledVector]): DataSet[LabeledVector] = {
+ val resultingParameters = instance.parameters ++ transformParameters
+ val min = resultingParameters(Min)
+ val max = resultingParameters(Max)
+
+ instance.metricsOption match {
+ case Some(metrics) => {
+ input.mapWithBcVariable(metrics) {
+ (labeledVector, metrics) => {
+ val (broadcastMin, broadcastMax) = metrics
+ val LabeledVector(label, vector) = labeledVector
+
+ LabeledVector(label, scaleVector(vector, broadcastMin, broadcastMax, min, max))
+ }
+ }
+ }
+
+ case None =>
+ throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " +
+ "This is necessary to estimate the minimum and maximum of the data.")
+ }
+ }
+ }
+ }
+
+ /** Scales a vector such that it's features lie in the range [min, max]
+ *
+ * @param vector Vector to scale
+ * @param broadcastMin Vector containing for each feature the minimal value in the training set
+ * @param broadcastMax Vector containing for each feature the maximal value in the training set
+ * @param min Minimal value of range
+ * @param max Maximal value of range
+ * @tparam T Type of [[Vector]]
+ * @return Scaled feature vector
+ */
+ private def scaleVector[T <: Vector: BreezeVectorConverter](
+ vector: T,
+ broadcastMin: linalg.Vector[Double],
+ broadcastMax: linalg.Vector[Double],
+ min: Double,
+ max: Double)
+ : T = {
+ var myVector = vector.asBreeze
+
+ //handle the case where a feature takes only one value
+ val rangePerFeature = (broadcastMax - broadcastMin)
+ for (i <- 0 until rangePerFeature.size) {
+ if (rangePerFeature(i) == 0.0) {
+ rangePerFeature(i)= 1.0
+ }
+ }
+
+ myVector -= broadcastMin
+ myVector :/= rangePerFeature
+ myVector = (myVector :* (max - min)) + min
+ myVector.fromBreeze
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/73f99117/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
new file mode 100644
index 0000000..75ac442
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/MinMaxScalerITSuite.scala
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.ml.preprocessing
+
+import breeze.linalg.{max, min}
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.{DenseVector, Vector}
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class MinMaxScalerITSuite
+ extends FlatSpec
+ with Matchers
+ with FlinkTestBase {
+
+ behavior of "Flink's MinMax Scaler"
+
+ import MinMaxScalerData._
+
+ it should "scale the vectors' values to be restricted in the [0.0,1.0] range" in {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val dataSet = env.fromCollection(data)
+ val minMaxScaler = MinMaxScaler()
+ minMaxScaler.fit(dataSet)
+ val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+ scaledVectors.length should equal(data.length)
+
+ //ensure data lies in the user-specified range
+ for (vector <- scaledVectors) {
+ val test = vector.asBreeze.forall(fv => {
+ fv >= 0.0 && fv <= 1.0
+ })
+ test shouldEqual true
+ }
+
+ var expectedMin = data.head.asBreeze
+ var expectedMax = data.head.asBreeze
+
+ for (v <- data.tail) {
+ val tempVector = v.asBreeze
+ expectedMin = min(expectedMin, tempVector)
+ expectedMax = max(expectedMax, tempVector)
+ }
+
+ //ensure that estimated Min and Max vectors equal the expected ones
+ val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+ estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+ //handle the case where a feature takes only one value
+ val expectedRangePerFeature = (expectedMax - expectedMin)
+ for (i <- 0 until expectedRangePerFeature.size) {
+ if (expectedRangePerFeature(i) == 0.0) {
+ expectedRangePerFeature(i)= 1.0
+ }
+ }
+
+ //ensure that vectors where scaled correctly
+ for (i <- 0 until data.length) {
+ var expectedVector = data(i).asBreeze - expectedMin
+ expectedVector :/= expectedRangePerFeature
+ expectedVector = expectedVector :* (1.0 - 0.0)
+
+ expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i)
+ }
+ }
+
+ it should "scale vectors' values in the [-1.0,1.0] range" in {
+
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val dataSet = env.fromCollection(labeledData)
+ val minMaxScaler = MinMaxScaler().setMin(-1.0)
+ minMaxScaler.fit(dataSet)
+ val scaledVectors = minMaxScaler.transform(dataSet).collect
+
+ scaledVectors.length should equal(labeledData.length)
+
+ //ensure data lies in the user-specified range
+ for (labeledVector <- scaledVectors) {
+ val test = labeledVector.vector.asBreeze.forall(lv => {
+ lv >= -1.0 && lv <= 1.0
+ })
+ test shouldEqual true
+ }
+
+ var expectedMin = labeledData.head.vector.asBreeze
+ var expectedMax = labeledData.head.vector.asBreeze
+
+ for (v <- labeledData.tail) {
+ val tempVector = v.vector.asBreeze
+ expectedMin = min(expectedMin, tempVector)
+ expectedMax = max(expectedMax, tempVector)
+ }
+
+ //ensure that estimated Min and Max vectors equal the expected ones
+ val estimatedMinMax = minMaxScaler.metricsOption.get.collect()
+ estimatedMinMax.head shouldEqual(expectedMin, expectedMax)
+
+ //handle the case where a feature takes only one value
+ val expectedRangePerFeature = (expectedMax - expectedMin)
+ for (i <- 0 until expectedRangePerFeature.size) {
+ if (expectedRangePerFeature(i) == 0.0) {
+ expectedRangePerFeature(i)= 1.0
+ }
+ }
+
+ //ensure that LabeledVectors where scaled correctly
+ for (i <- 0 until labeledData.length) {
+ var expectedVector = labeledData(i).vector.asBreeze - expectedMin
+ expectedVector :/= expectedRangePerFeature
+ expectedVector = (expectedVector :* (1.0 + 1.0)) - 1.0
+
+ labeledData(i).label shouldEqual scaledVectors(i).label
+ expectedVector.fromBreeze.toSeq should contain theSameElementsInOrderAs scaledVectors(i)
+ .vector
+ }
+ }
+}
+
+
+object MinMaxScalerData {
+
+ val data: Seq[Vector] = List(
+ DenseVector(Array(2104.00, 3.00, 0.0)),
+ DenseVector(Array(1600.00, 3.00, 0.0)),
+ DenseVector(Array(2400.00, 3.00, 0.0)),
+ DenseVector(Array(1416.00, 2.00, 0.0)),
+ DenseVector(Array(3000.00, 4.00, 0.0)),
+ DenseVector(Array(1985.00, 4.00, 0.0)),
+ DenseVector(Array(1534.00, 3.00, 0.0)),
+ DenseVector(Array(1427.00, 3.00, 0.0)),
+ DenseVector(Array(1380.00, 3.00, 0.0)),
+ DenseVector(Array(1494.00, 3.00, 0.0)),
+ DenseVector(Array(1940.00, 4.00, 0.0)),
+ DenseVector(Array(2000.00, 3.00, 0.0)),
+ DenseVector(Array(1890.00, 3.00, 0.0)),
+ DenseVector(Array(4478.00, 5.00, 0.0)),
+ DenseVector(Array(1268.00, 3.00, 0.0)),
+ DenseVector(Array(2300.00, 4.00, 0.0)),
+ DenseVector(Array(1320.00, 2.00, 0.0)),
+ DenseVector(Array(1236.00, 3.00, 0.0)),
+ DenseVector(Array(2609.00, 4.00, 0.0)),
+ DenseVector(Array(3031.00, 4.00, 0.0)),
+ DenseVector(Array(1767.00, 3.00, 0.0)),
+ DenseVector(Array(1888.00, 2.00, 0.0)),
+ DenseVector(Array(1604.00, 3.00, 0.0)),
+ DenseVector(Array(1962.00, 4.00, 0.0)),
+ DenseVector(Array(3890.00, 3.00, 0.0)),
+ DenseVector(Array(1100.00, 3.00, 0.0)),
+ DenseVector(Array(1458.00, 3.00, 0.0)),
+ DenseVector(Array(2526.00, 3.00, 0.0)),
+ DenseVector(Array(2200.00, 3.00, 0.0)),
+ DenseVector(Array(2637.00, 3.00, 0.0)),
+ DenseVector(Array(1839.00, 2.00, 0.0)),
+ DenseVector(Array(1000.00, 1.00, 0.0)),
+ DenseVector(Array(2040.00, 4.00, 0.0)),
+ DenseVector(Array(3137.00, 3.00, 0.0)),
+ DenseVector(Array(1811.00, 4.00, 0.0)),
+ DenseVector(Array(1437.00, 3.00, 0.0)),
+ DenseVector(Array(1239.00, 3.00, 0.0)),
+ DenseVector(Array(2132.00, 4.00, 0.0)),
+ DenseVector(Array(4215.00, 4.00, 0.0)),
+ DenseVector(Array(2162.00, 4.00, 0.0)),
+ DenseVector(Array(1664.00, 2.00, 0.0)),
+ DenseVector(Array(2238.00, 3.00, 0.0)),
+ DenseVector(Array(2567.00, 4.00, 0.0)),
+ DenseVector(Array(1200.00, 3.00, 0.0)),
+ DenseVector(Array(852.00, 2.00, 0.0)),
+ DenseVector(Array(1852.00, 4.00, 0.0)),
+ DenseVector(Array(1203.00, 3.00, 0.0))
+ )
+
+ val labeledData: Seq[LabeledVector] = List(
+ LabeledVector(1.0, DenseVector(Array(2104.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1600.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2400.00, 3.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1416.00, 2.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(3000.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1985.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1534.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1427.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1380.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1494.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1940.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2000.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1890.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(4478.00, 5.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1268.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2300.00, 4.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1320.00, 2.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1236.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2609.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(3031.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1767.00, 3.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1888.00, 2.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1604.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1962.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(3890.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1100.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1458.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2526.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2200.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2637.00, 3.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1839.00, 2.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1000.00, 1.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2040.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(3137.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1811.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1437.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1239.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2132.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(4215.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2162.00, 4.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(1664.00, 2.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2238.00, 3.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(2567.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1200.00, 3.00, 0.0))),
+ LabeledVector(0.0, DenseVector(Array(852.00, 2.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1852.00, 4.00, 0.0))),
+ LabeledVector(1.0, DenseVector(Array(1203.00, 3.00, 0.0)))
+ )
+}
[2/2] flink git commit: [ml] Makes StandardScalers state package
private and reduce redundant code. Adjusts flink-ml readme.
Posted by tr...@apache.org.
[ml] Makes StandardScalers state package private and reduce redundant code. Adjusts flink-ml readme.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97611c24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97611c24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97611c24
Branch: refs/heads/master
Commit: 97611c245f4df5820124fba25e55a2bac59086b4
Parents: 73f9911
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jun 9 10:23:09 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jun 9 10:29:24 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-ml/README.md | 5 +--
.../flink/ml/preprocessing/StandardScaler.scala | 44 +++++++++++++-------
2 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/97611c24/flink-staging/flink-ml/README.md
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/README.md b/flink-staging/flink-ml/README.md
index 5721e8f..5cabd7c 100644
--- a/flink-staging/flink-ml/README.md
+++ b/flink-staging/flink-ml/README.md
@@ -7,10 +7,9 @@ Theses implementations allow to scale to data sizes which vastly exceed the memo
Flink-ML currently comprises the following algorithms:
* Classification
+** Soft-margin SVM
* Regression
-** Logistic regression
-* Clustering
-** k-Means
+** Multiple linear regression
* Recommendation
** Alternating least squares (ALS)
http://git-wip-us.apache.org/repos/asf/flink/blob/97611c24/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index 3b9c8d2..bf09b20 100644
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -21,10 +21,8 @@ package org.apache.flink.ml.preprocessing
import breeze.linalg
import breeze.numerics.sqrt
import breeze.numerics.sqrt._
-import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
import org.apache.flink.ml._
import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
import org.apache.flink.ml.math.Breeze._
@@ -62,7 +60,9 @@ import scala.reflect.ClassTag
*/
class StandardScaler extends Transformer[StandardScaler] {
- var metricsOption: Option[DataSet[(linalg.Vector[Double], linalg.Vector[Double])]] = None
+ private[preprocessing] var metricsOption: Option[
+ DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
+ ] = None
/** Sets the target mean of the transformed data
*
@@ -213,12 +213,7 @@ object StandardScaler {
input.mapWithBcVariable(metrics){
(vector, metrics) => {
val (broadcastMean, broadcastStd) = metrics
- var myVector = vector.asBreeze
-
- myVector -= broadcastMean
- myVector :/= broadcastStd
- myVector = (myVector :* std) + mean
- myVector.fromBreeze
+ scaleVector(vector, broadcastMean, broadcastStd, mean, std)
}
}
}
@@ -245,12 +240,8 @@ object StandardScaler {
(labeledVector, metrics) => {
val (broadcastMean, broadcastStd) = metrics
val LabeledVector(label, vector) = labeledVector
- var breezeVector = vector.asBreeze
- breezeVector -= broadcastMean
- breezeVector :/= broadcastStd
- breezeVector = (breezeVector :* std) + mean
- LabeledVector(label, breezeVector.fromBreeze)
+ LabeledVector(label, scaleVector(vector, broadcastMean, broadcastStd, mean, std))
}
}
}
@@ -262,4 +253,29 @@ object StandardScaler {
}
}
}
+
+ /** Scales the given vector such that it has the given mean and std
+ *
+ * @param vector Vector to be scaled
+ * @param dataMean Mean of the training data
+ * @param dataStd Standard deviation of the training data
+ * @param mean Mean of the scaled data
+ * @param std Standard deviation of the scaled data
+ * @tparam T Type of [[Vector]]
+ * @return Scaled vector
+ */
+ private def scaleVector[T <: Vector: BreezeVectorConverter](
+ vector: T,
+ dataMean: linalg.Vector[Double],
+ dataStd: linalg.Vector[Double],
+ mean: Double,
+ std: Double)
+ : T = {
+ var myVector = vector.asBreeze
+
+ myVector -= dataMean
+ myVector :/= dataStd
+ myVector = (myVector :* std) + mean
+ myVector.fromBreeze
+ }
}