You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Fokko <gi...@git.apache.org> on 2017/01/06 17:05:29 UTC

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

GitHub user Fokko opened a pull request:

    https://github.com/apache/flink/pull/3077

    [FLINK-5423] Implement Stochastic Outlier Selection

    Implemented the Stochastic Outlier Selection algorithm in the Machine Learning library, including the test code. Added documentation.
    
    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Fokko/flink fd-implement-stochastic-outlier-selection

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3077.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3077
    
----
commit a67b170f6eee6c053322a4730f1b8dcaa680a112
Author: Fokko Driesprong <fo...@godatadriven.com>
Date:   2016-12-30T06:38:52Z

    Implemented the Stochastic Outlier Selection algorithm in the Machine Learning library, including the test code. Added documentation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96203398
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    +          BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
    +        })
    +
    +        outlierSelection(vectorsWithIndex, resultingParameters).map(_._2)
    +      }
    +    }
    +  }
    +
    +  /** Internal entry point which will execute the different stages of the algorithm using a single
    +    * interface
    +    *
    +    * @param inputVectors        Input vectors on which the stochastic outlier selection algorithm
    +    *                            will be applied
    +    * @param transformParameters The user defined parameters of the algorithm
    +    * @return The outlierness of the vectors compared to each other
    +    */
    +  private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector],
    +                               transformParameters: ParameterMap): DataSet[(Int, Double)] = {
    +    val dissimilarityVectors = computeDissimilarityVectors(inputVectors)
    +    val affinityVectors = computeAffinity(dissimilarityVectors, transformParameters)
    +    val bindingProbabilityVectors = computeBindingProbabilities(affinityVectors)
    +    val outlierProbability = computeOutlierProbability(bindingProbabilityVectors)
    +
    +    outlierProbability
    +  }
    +
    +  /** Compute pair-wise distance from each vector, to all other vectors.
    +    *
    +    * @param inputVectors The input vectors, will compare the vector to all other vectors based
    +    *                     on an distance method.
    +    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector
    +    */
    +  def computeDissimilarityVectors(inputVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  inputVectors.cross(inputVectors) {
    +    (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data))
    +  }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this contains no information.
    +    .groupBy(0)
    +    .sortGroup(1, Order.ASCENDING)
    +    .reduceGroup {
    +      distancesIterator => {
    +        val distances = distancesIterator.toList
    +        val distanceVector = distances.map(_._3).toArray
    +
    +        BreezeLabeledVector(distances.head._1, BreezeDenseVector(distanceVector))
    +      }
    +    }
    +
    +  /** Approximate the affinity by fitting a Gaussian-like function
    +    *
    +    * @param dissimilarityVectors The dissimilarity vectors which represents the distance to the
    +    *                             other vectors in the data set.
    +    * @param resultingParameters  The user defined parameters of the algorithm
    +    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector
    +    */
    +  def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector],
    +                      resultingParameters: ParameterMap): DataSet[BreezeLabeledVector] = {
    +    val logPerplexity = Math.log(resultingParameters(Perplexity))
    +    val maxIterations = resultingParameters(MaxIterations)
    +    val errorTolerance = resultingParameters(ErrorTolerance)
    +
    +    dissimilarityVectors.map(vec => {
    +      val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, errorTolerance)
    +      BreezeLabeledVector(vec.idx, breezeVec)
    +    })
    +  }
    +
    +  /** Normalizes the input vectors so each row sums up to one.
    +    *
    +    * @param affinityVectors The affinity vectors which is the quantification of the relationship
    +    *                        between the original vectors.
    +    * @return Returns new set of [[BreezeLabeledVector]] with represents the binding
    +    *         probabilities, which is in fact the affinity where each row sums up to one.
    +    */
    +  def computeBindingProbabilities(affinityVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ sum(vec.data)))
    +
    +  /** Compute the final outlier probability by taking the product of the column.
    +    *
    +    * @param bindingProbabilityVectors The binding probability vectors where the binding
    +    *                                  probability is based on the affinity and represents the
    +    *                                  probability of a vector binding with another vector.
    +    * @return Returns a single double which represents the final outlierness of the input vector.
    +    */
    +  def computeOutlierProbability(bindingProbabilityVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[(Int, Double)] = bindingProbabilityVectors
    +    .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => {
    +
    +      // The DistanceMatrix removed the diagonal, but we need to compute the product
    +      // of the column, so we need to correct the offset.
    +      val columnIndex = if (pair._2 >= vec.idx) {
    +        1
    +      } else {
    +        0
    +      }
    +
    +      (columnIndex + pair._2, pair._1)
    +    })).groupBy(0).reduceGroup {
    +    probabilities => {
    +      var rowNumber = -1
    +      var outlierProbability = 1.0
    +      for (probability <- probabilities) {
    +        rowNumber = probability._1
    +        outlierProbability = outlierProbability * (1.0 - probability._2)
    +      }
    +
    +      (rowNumber, outlierProbability)
    +    }
    +  }
    +
    +  /** Performs a binary search to get affinities in such a way that each conditional Gaussian has
    +    *  the same perplexity.
    +    *
    +    * @param dissimilarityVector The input dissimilarity vector which represents the current
    +    *                            vector distance to the other vectors in the data set
    +    * @param logPerplexity The log of the perplexity, which represents the probability of having
    +    *                      affinity with another vector.
    +    * @param maxIterations The maximum iterations to limit the computational time.
    +    * @param tolerance The allowed tolerance to sacrifice precision for decreased computational
    +    *                  time.
    +    * @param beta: The current beta
    +    * @param betaMin The lower bound of beta
    +    * @param betaMax The upper bound of beta
    +    * @param iteration The current iteration
    +    * @return Returns the affinity vector of the input vector.
    +    */
    +  def binarySearch(dissimilarityVector: BreezeVector[Double],
    +                   logPerplexity: Double,
    +                   maxIterations: Int,
    +                   tolerance: Double,
    +                   beta: Double = 1.0,
    +                   betaMin: Double = Double.NegativeInfinity,
    +                   betaMax: Double = Double.PositiveInfinity,
    +                   iteration: Int = 0): BreezeVector[Double] = {
    --- End diff --
    
    The non written down coding style was to break a long parameter list by writing each parameter in its own line with double indentation. Something like
    ```
    def fun(
        x: Int,
        y: Double)
      : String = {
      // foobar
    }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3077
  
    Thanks for your contribution @Fokko. I'll take a look at this PR in the next days :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on the issue:

    https://github.com/apache/flink/pull/3077
  
    I think we have some flakey tests, since it passes on my own travis:
    https://travis-ci.org/Fokko/flink/builds/189855914


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96290365
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    +          BreezeLabeledVector(vector._1.toInt, vector._2.asBreeze)
    +        })
    +
    +        outlierSelection(vectorsWithIndex, resultingParameters).map(_._2)
    +      }
    +    }
    +  }
    +
    +  /** Internal entry point which will execute the different stages of the algorithm using a single
    +    * interface
    +    *
    +    * @param inputVectors        Input vectors on which the stochastic outlier selection algorithm
    +    *                            will be applied
    +    * @param transformParameters The user defined parameters of the algorithm
    +    * @return The outlierness of the vectors compared to each other
    +    */
    +  private def outlierSelection(inputVectors: DataSet[BreezeLabeledVector],
    +                               transformParameters: ParameterMap): DataSet[(Int, Double)] = {
    +    val dissimilarityVectors = computeDissimilarityVectors(inputVectors)
    +    val affinityVectors = computeAffinity(dissimilarityVectors, transformParameters)
    +    val bindingProbabilityVectors = computeBindingProbabilities(affinityVectors)
    +    val outlierProbability = computeOutlierProbability(bindingProbabilityVectors)
    +
    +    outlierProbability
    +  }
    +
    +  /** Compute pair-wise distance from each vector, to all other vectors.
    +    *
    +    * @param inputVectors The input vectors, will compare the vector to all other vectors based
    +    *                     on an distance method.
    +    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector
    +    */
    +  def computeDissimilarityVectors(inputVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  inputVectors.cross(inputVectors) {
    +    (a, b) => (a.idx, b.idx, euclideanDistance(a.data, b.data))
    +  }.filter(dist => dist._1 != dist._2) // Filter out the diagonal, this contains no information.
    +    .groupBy(0)
    +    .sortGroup(1, Order.ASCENDING)
    +    .reduceGroup {
    +      distancesIterator => {
    +        val distances = distancesIterator.toList
    +        val distanceVector = distances.map(_._3).toArray
    +
    +        BreezeLabeledVector(distances.head._1, BreezeDenseVector(distanceVector))
    +      }
    +    }
    +
    +  /** Approximate the affinity by fitting a Gaussian-like function
    +    *
    +    * @param dissimilarityVectors The dissimilarity vectors which represents the distance to the
    +    *                             other vectors in the data set.
    +    * @param resultingParameters  The user defined parameters of the algorithm
    +    * @return Returns new set of [[BreezeLabeledVector]] with dissimilarity vector
    +    */
    +  def computeAffinity(dissimilarityVectors: DataSet[BreezeLabeledVector],
    +                      resultingParameters: ParameterMap): DataSet[BreezeLabeledVector] = {
    +    val logPerplexity = Math.log(resultingParameters(Perplexity))
    +    val maxIterations = resultingParameters(MaxIterations)
    +    val errorTolerance = resultingParameters(ErrorTolerance)
    +
    +    dissimilarityVectors.map(vec => {
    +      val breezeVec = binarySearch(vec.data, logPerplexity, maxIterations, errorTolerance)
    +      BreezeLabeledVector(vec.idx, breezeVec)
    +    })
    +  }
    +
    +  /** Normalizes the input vectors so each row sums up to one.
    +    *
    +    * @param affinityVectors The affinity vectors which is the quantification of the relationship
    +    *                        between the original vectors.
    +    * @return Returns new set of [[BreezeLabeledVector]] with represents the binding
    +    *         probabilities, which is in fact the affinity where each row sums up to one.
    +    */
    +  def computeBindingProbabilities(affinityVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[BreezeLabeledVector] =
    +  affinityVectors.map(vec => BreezeLabeledVector(vec.idx, vec.data :/ sum(vec.data)))
    +
    +  /** Compute the final outlier probability by taking the product of the column.
    +    *
    +    * @param bindingProbabilityVectors The binding probability vectors where the binding
    +    *                                  probability is based on the affinity and represents the
    +    *                                  probability of a vector binding with another vector.
    +    * @return Returns a single double which represents the final outlierness of the input vector.
    +    */
    +  def computeOutlierProbability(bindingProbabilityVectors: DataSet[BreezeLabeledVector]):
    +  DataSet[(Int, Double)] = bindingProbabilityVectors
    +    .flatMap(vec => vec.data.toArray.zipWithIndex.map(pair => {
    +
    +      // The DistanceMatrix removed the diagonal, but we need to compute the product
    +      // of the column, so we need to correct the offset.
    +      val columnIndex = if (pair._2 >= vec.idx) {
    +        1
    +      } else {
    +        0
    +      }
    +
    +      (columnIndex + pair._2, pair._1)
    +    })).groupBy(0).reduceGroup {
    +    probabilities => {
    +      var rowNumber = -1
    +      var outlierProbability = 1.0
    +      for (probability <- probabilities) {
    +        rowNumber = probability._1
    +        outlierProbability = outlierProbability * (1.0 - probability._2)
    +      }
    +
    +      (rowNumber, outlierProbability)
    +    }
    +  }
    +
    +  /** Performs a binary search to get affinities in such a way that each conditional Gaussian has
    +    *  the same perplexity.
    +    *
    +    * @param dissimilarityVector The input dissimilarity vector which represents the current
    +    *                            vector distance to the other vectors in the data set
    +    * @param logPerplexity The log of the perplexity, which represents the probability of having
    +    *                      affinity with another vector.
    +    * @param maxIterations The maximum iterations to limit the computational time.
    +    * @param tolerance The allowed tolerance to sacrifice precision for decreased computational
    +    *                  time.
    +    * @param beta: The current beta
    +    * @param betaMin The lower bound of beta
    +    * @param betaMax The upper bound of beta
    +    * @param iteration The current iteration
    +    * @return Returns the affinity vector of the input vector.
    +    */
    +  def binarySearch(dissimilarityVector: BreezeVector[Double],
    +                   logPerplexity: Double,
    +                   maxIterations: Int,
    +                   tolerance: Double,
    +                   beta: Double = 1.0,
    +                   betaMin: Double = Double.NegativeInfinity,
    +                   betaMax: Double = Double.PositiveInfinity,
    +                   iteration: Int = 0): BreezeVector[Double] = {
    --- End diff --
    
    Check


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96290218
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    --- End diff --
    
    Yes, I've checked this and it is possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96290195
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    --- End diff --
    
    Good thinking, I've added a scaladoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on the issue:

    https://github.com/apache/flink/pull/3077
  
    Thanks @tillrohrmann, excellent idea regarding the documentation. I'll also process the code comments, good feedback. Somewhere today or tomorrow I will fix this.
    
    Cheers, Fokko


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on the issue:

    https://github.com/apache/flink/pull/3077
  
    @tillrohrmann I've added documentation about the algorithm. Can you check? 
    
    Cheers, Fokko


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96202871
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[LabeledVector]): DataSet[(Int, Double)] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        val vectorsWithIndex = input.map(labeledVector => {
    +          BreezeLabeledVector(labeledVector.label.toInt, labeledVector.vector.asBreeze)
    +        })
    +
    +        // Don't map back to a labeled-vector since the output of the algorithm is
    +        // a single double instead of vector
    +        outlierSelection(vectorsWithIndex, resultingParameters)
    +      }
    +    }
    +  }
    +
    +  /** [[TransformDataSetOperation]] applies the stochastic outlier selection algorithm on a
    +    * [[Vector]] which will transform the high-dimensionaly input to a single Double output.
    +    *
    +    * @tparam T Type of the input and output data which has to be a subtype of [[Vector]]
    +    * @return [[TransformDataSetOperation]] a single double which represents the oulierness of
    +    *         the input vectors, where the output is in [0, 1]
    +    */
    +  implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag]
    +  = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, T, Double] {
    +      override def transformDataSet(instance: StochasticOutlierSelection,
    +                                    transformParameters: ParameterMap,
    +                                    input: DataSet[T]): DataSet[Double] = {
    +
    +        val resultingParameters = instance.parameters ++ transformParameters
    +
    +        // Map to the right format
    +        val vectorsWithIndex = input.zipWithIndex.map(vector => {
    --- End diff --
    
    Does it strictly have to be the index you're zipping with? Would be a unique ID be enough? If so, then we could use the `DatatSetUtils.zipWithUniqueId` method which is considerably cheaper than `zipWithIndex`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3077: [FLINK-5423] Implement Stochastic Outlier Selection

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on the issue:

    https://github.com/apache/flink/pull/3077
  
    Changes look really good to me :-) Travis is passing modulo an unrelated test failure. Will merge this PR. Thanks a lot for your contribution @Fokko.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96202060
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector}
    +import org.apache.flink.api.scala._
    +import org.apache.flink.ml.common.LabeledVector
    +import org.apache.flink.ml.math.DenseVector
    +import org.apache.flink.ml.outlier.StochasticOutlierSelection.BreezeLabeledVector
    +import org.apache.flink.ml.util.FlinkTestBase
    +import org.scalatest.{FlatSpec, Matchers}
    +
    +class StochasticOutlierSelectionITSuite extends FlatSpec with Matchers with FlinkTestBase {
    +  behavior of "Stochastic Outlier Selection algorithm"
    +  val EPSILON = 1e-16
    +
    +  /*
    +    Unit-tests created based on the Python scripts of the algorithms author'
    +    https://github.com/jeroenjanssens/scikit-sos
    +
    +    For more information about SOS, see https://github.com/jeroenjanssens/sos
    +    J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +    Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +    Tilburg, the Netherlands, 2012.
    +   */
    +
    +  val perplexity = 3
    +  val errorTolerance = 0
    +  val maxIterations = 5000
    +  val parameters = new StochasticOutlierSelection().setPerplexity(perplexity).parameters
    +
    +  val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +  it should "Compute the perplexity of the vector and return the correct error" in {
    +    val vector = BreezeDenseVector(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 9.0, 10.0))
    +
    +    val output = Array(
    +      0.39682901665799636,
    +      0.15747326846175236,
    +      0.06248996227359784,
    +      0.024797830280027126,
    +      0.009840498605275054,
    +      0.0039049953849556816,
    +      6.149323865970302E-4,
    +      2.4402301428445443E-4,
    +      9.683541280042027E-5
    +    )
    +
    +    val search = StochasticOutlierSelection.binarySearch(
    +      vector,
    +      Math.log(perplexity),
    +      maxIterations,
    +      errorTolerance
    +    ).toArray
    +
    +    search should be(output)
    +  }
    +
    +  it should "Compute the distance matrix and give symmetrical distances" in {
    +
    +    val data = env.fromCollection(List(
    +      BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 3.0))),
    +      BreezeLabeledVector(1, BreezeDenseVector(Array(5.0, 1.0)))
    +    ))
    +
    +    val distanceMatrix = StochasticOutlierSelection
    +      .computeDissimilarityVectors(data)
    +      .map(_.data)
    +      .collect()
    +      .toArray
    +
    +    print(distanceMatrix)
    --- End diff --
    
    I think we should remove the print statement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by tillrohrmann <gi...@git.apache.org>.
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96202972
  
    --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/outlier/StochasticOutlierSelection.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +/** An implementation of the Stochastic Outlier Selection algorithm by Jeroen Jansen
    +  *
    +  * For more information about SOS, see https://github.com/jeroenjanssens/sos
    +  * J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +  * Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +  * Tilburg, the Netherlands, 2012.
    +  *
    +  * @example
    +  *          {{{
    +  *             val inputDS = env.fromCollection(List(
    +  *               LabeledVector(0.0, DenseVector(1.0, 1.0)),
    +  *               LabeledVector(1.0, DenseVector(2.0, 1.0)),
    +  *               LabeledVector(2.0, DenseVector(1.0, 2.0)),
    +  *               LabeledVector(3.0, DenseVector(2.0, 2.0)),
    +  *               LabeledVector(4.0, DenseVector(5.0, 8.0)) // The outlier!
    +  *             ))
    +  *
    +  *             val sos = StochasticOutlierSelection()
    +  *               .setPerplexity(3)
    +  *
    +  *             val outputDS = sos.transform(inputDS)
    +  *
    +  *             val expectedOutputDS = Array(
    +  *                0.2790094479202896,
    +  *                0.25775014551682535,
    +  *                0.22136130977995766,
    +  *                0.12707053787018444,
    +  *                0.9922779902453757 // The outlier!
    +  *             )
    +  *
    +  *             assert(outputDS == expectedOutputDS)
    +  *          }}}
    +  *
    +  * =Parameters=
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.Perplexity]]:
    +  *  Perplexity can be interpreted as the k in k-nearest neighbor algorithms. The difference is that
    +  *  in SOS being a neighbor is not a binary property, but a probabilistic one. Should be between
    +  *  1 and n-1, where n is the number of observations.
    +  *  (Default value: '''30''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.ErrorTolerance]]:
    +  *  The accepted error tolerance. When increasing this number, it will sacrifice accuracy in
    +  *  return for reduced computational time.
    +  *  (Default value: '''1e-20''')
    +  *
    +  *  - [[org.apache.flink.ml.outlier.StochasticOutlierSelection.MaxIterations]]:
    +  *  The maximum number of iterations to perform. (Default value: '''5000''')
    +  */
    +
    +import breeze.linalg.functions.euclideanDistance
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector, Vector => BreezeVector}
    +import org.apache.flink.api.common.operators.Order
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.scala._
    +import org.apache.flink.api.scala.utils._
    +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap, WithParameters}
    +import org.apache.flink.ml.math.Breeze._
    +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
    +import org.apache.flink.ml.pipeline.{TransformDataSetOperation, Transformer}
    +
    +import scala.language.implicitConversions
    +import scala.reflect.ClassTag
    +
    +class StochasticOutlierSelection extends Transformer[StochasticOutlierSelection] {
    +
    +  import StochasticOutlierSelection._
    +
    +
    +  /** Sets the perplexity of the outlier selection algorithm, can be seen as the k of kNN
    +    * For more information, please read the Stochastic Outlier Selection algorithm paper
    +    *
    +    * @param perplexity the perplexity of the affinity fit
    +    * @return
    +    */
    +  def setPerplexity(perplexity: Double): StochasticOutlierSelection = {
    +    require(perplexity >= 1, "Perplexity must be at least one.")
    +    parameters.add(Perplexity, perplexity)
    +    this
    +  }
    +
    +  /** The accepted error tolerance to save computational time when computing the affinity
    +    *
    +    * @param errorTolerance the accepted error tolerance with respect to the affinity
    +    * @return
    +    */
    +  def setErrorTolerance(errorTolerance: Double): StochasticOutlierSelection = {
    +    require(errorTolerance >= 0, "Error tolerance cannot be negative.")
    +    parameters.add(ErrorTolerance, errorTolerance)
    +    this
    +  }
    +
    +  /** The maximum number of iterations to approximate the affinity
    +    *
    +    * @param maxIterations the maximum number of iterations
    +    * @return
    +    */
    +  def setMaxIterations(maxIterations: Int): StochasticOutlierSelection = {
    +    require(maxIterations > 0, "Maximum iterations must be positive.")
    +    parameters.add(MaxIterations, maxIterations)
    +    this
    +  }
    +
    +}
    +
    +object StochasticOutlierSelection extends WithParameters {
    +
    +  // ========================================= Parameters ==========================================
    +  case object Perplexity extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(30)
    +  }
    +
    +  case object ErrorTolerance extends Parameter[Double] {
    +    val defaultValue: Option[Double] = Some(1e-20)
    +  }
    +
    +  case object MaxIterations extends Parameter[Int] {
    +    val defaultValue: Option[Int] = Some(5000)
    +  }
    +
    +  // ==================================== Factory methods ==========================================
    +
    +  def apply(): StochasticOutlierSelection = {
    +    new StochasticOutlierSelection()
    +  }
    +
    +  // ===================================== Operations ==============================================
    +  case class BreezeLabeledVector(idx: Int, data: BreezeVector[Double])
    +
    +  implicit val transformLabeledVectors = {
    +    new TransformDataSetOperation[StochasticOutlierSelection, LabeledVector, (Int, Double)] {
    --- End diff --
    
    Maybe we could add a comment that the label of `LabeledVector` should be the index or a unique integer value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by Fokko <gi...@git.apache.org>.
Github user Fokko commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3077#discussion_r96289143
  
    --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/outlier/StochasticOutlierSelectionITSuite.scala ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.ml.outlier
    +
    +import breeze.linalg.{sum, DenseVector => BreezeDenseVector}
    +import org.apache.flink.api.scala._
    +import org.apache.flink.ml.common.LabeledVector
    +import org.apache.flink.ml.math.DenseVector
    +import org.apache.flink.ml.outlier.StochasticOutlierSelection.BreezeLabeledVector
    +import org.apache.flink.ml.util.FlinkTestBase
    +import org.scalatest.{FlatSpec, Matchers}
    +
    +class StochasticOutlierSelectionITSuite extends FlatSpec with Matchers with FlinkTestBase {
    +  behavior of "Stochastic Outlier Selection algorithm"
    +  val EPSILON = 1e-16
    +
    +  /*
    +    Unit-tests created based on the Python scripts of the algorithms author'
    +    https://github.com/jeroenjanssens/scikit-sos
    +
    +    For more information about SOS, see https://github.com/jeroenjanssens/sos
    +    J.H.M. Janssens, F. Huszar, E.O. Postma, and H.J. van den Herik. Stochastic
    +    Outlier Selection. Technical Report TiCC TR 2012-001, Tilburg University,
    +    Tilburg, the Netherlands, 2012.
    +   */
    +
    +  val perplexity = 3
    +  val errorTolerance = 0
    +  val maxIterations = 5000
    +  val parameters = new StochasticOutlierSelection().setPerplexity(perplexity).parameters
    +
    +  val env = ExecutionEnvironment.getExecutionEnvironment
    +
    +  it should "Compute the perplexity of the vector and return the correct error" in {
    +    val vector = BreezeDenseVector(Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 9.0, 10.0))
    +
    +    val output = Array(
    +      0.39682901665799636,
    +      0.15747326846175236,
    +      0.06248996227359784,
    +      0.024797830280027126,
    +      0.009840498605275054,
    +      0.0039049953849556816,
    +      6.149323865970302E-4,
    +      2.4402301428445443E-4,
    +      9.683541280042027E-5
    +    )
    +
    +    val search = StochasticOutlierSelection.binarySearch(
    +      vector,
    +      Math.log(perplexity),
    +      maxIterations,
    +      errorTolerance
    +    ).toArray
    +
    +    search should be(output)
    +  }
    +
    +  it should "Compute the distance matrix and give symmetrical distances" in {
    +
    +    val data = env.fromCollection(List(
    +      BreezeLabeledVector(0, BreezeDenseVector(Array(1.0, 3.0))),
    +      BreezeLabeledVector(1, BreezeDenseVector(Array(5.0, 1.0)))
    +    ))
    +
    +    val distanceMatrix = StochasticOutlierSelection
    +      .computeDissimilarityVectors(data)
    +      .map(_.data)
    +      .collect()
    +      .toArray
    +
    +    print(distanceMatrix)
    --- End diff --
    
    Oops, still in there from the debugging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3077: [FLINK-5423] Implement Stochastic Outlier Selectio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3077


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---