You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by LIDIAgroup <gi...@git.apache.org> on 2014/03/24 17:46:58 UTC

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

GitHub user LIDIAgroup opened a pull request:

    https://github.com/apache/spark/pull/216

    [SPARK-1303]  [MLLIB] Added discretization capability to MLlib.

    https://spark-project.atlassian.net/browse/SPARK-1303

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/LIDIAgroup/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #216
    
----
commit 19986353b9197a84092b1d4765ae36986b8d93e8
Author: LIDIAgroup <hm...@gmail.com>
Date:   2014-03-24T16:40:44Z

    Added Entropy Minimization Discretization.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10897487
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EMDDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements. See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License. You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EMDDiscretizer private (
    --- End diff --
    
    The name `EMD` is not a common acronym for the algorithm. `MDLP` was used in the paper and `MDL` was used in derived work. But I do think `MDL` is more confusing. Shall we call it `EntropyMinimizationDiscretizer`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11196809
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +private[discretization] object ArrayAccumulator extends AccumulatorParam[Array[Long]] {
    +
    +  def addInPlace(array1: Array[Long], array2: Array[Long]): Array[Long] = {
    +    array2.clone
    --- End diff --
    
    I want to replace the previous value with the new one. I'm not sure which one is more efficient: 
    ```
    array.clone
    ```
     or 
    ```
    for (i <- 0 until array1.size) array1(i) = array2(i)
    array1
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947648
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +private[discretization] object MapAccumulator extends AccumulatorParam[Map[String, Int]] {
    --- End diff --
    
    Use `Long` instead of `Int` to avoid overflow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38471741
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10946507
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    --- End diff --
    
    Better use `mutable.Map` instead of `Map` to be explicit.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11019805
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    --- End diff --
    
    I've made a really simple test to determine which ratio was faster in my case. But I think this would be different in each cluster and, therefore, it's configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10954204
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    --- End diff --
    
    Could you comment on what this is for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10941192
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * DiscretizerModel provides a template with the basic methods for future discretizers.
    + */
    +trait DiscretizerModel extends Serializable {
    +  /**
    +   * Return the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double]
    +
    +  /**
    +   * Return the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]]
    +
    +  /**
    +   * Return the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]]
    +
    +  /**
    +   * Discretizes an RDD
    +   */
    +  def discretize: RDD[_]
    --- End diff --
    
    This method is not clear to me. It takes no argument and returns an RDD of undefined type. A `DiscretizerModel` may be trained on one RDD but applied to another RDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10948196
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +object Utils {
    +
    +  implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal {
    --- End diff --
    
    Sorry, is it used somewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38727471
  
    @LIDIAgroup For the second item, it is very common to have different training and descretizing data. For example, we have a labeled dataset containing a subset of members, and we want to train a model based on it and make predictions for other members. A discretizer is trained from the labeled dataset, and then a predictive model is trained using the discretized features. The next step is to make predictions, where we have to apply the same discretization to other member feature vectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11000756
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.util
    +
    +import org.apache.spark.mllib.discretization.Utils
    +
    +/**
    + * Object with some Information Theory methods.
    + */
    +object InfoTheory {
    --- End diff --
    
    We can make a different PR and discuss the design for information theory. Mixing it with the current PR would certainly delay the review process. For example, we need to discuss whether `entropy(Seq[Int])` is a proper interface. What if we want to compute entropy of `Seq[Double]`? So please try to make the number of public interfaces as few as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16054027
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    +    }
    +  }
    +}
    +
    +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +  test("EMD discretization") {
    +    val rnd = new Random(13)
    +        
    +    val data = for (i <- 1 to 99) yield
    +      if (i <= 33) {
    +        LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +      } else if (i <= 66) {
    +        LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +      } else {
    +        LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11168017
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    +    }
    +  }
    +}
    +
    +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +  test("EMD discretization") {
    +    val rnd = new Random(13)
    +		
    +		val data = for (i <- 1 to 99) yield
    +			if (i <= 33) {
    +			    LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			} else if (i <= 66) {
    +			    LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			} else {
    +			    LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			}
    +		
    +		val shuffledData = data.sortWith((lp1, lp2) => rnd.nextDouble < 0.5)
    +		
    +		val rdd = sc.parallelize(shuffledData, 3)
    +
    +		val discretizer = EntropyMinimizationDiscretizer.train(rdd, Seq(0))
    +				
    +		val thresholdsArray = discretizer.thresholds(0).toArray
    +		if (math.abs(thresholdsArray(1) - 33.5) > 1.55) {
    +		    fail("Selected thresholds aren't what they should be.")
    +		}
    +		if (math.abs(thresholdsArray(2) - 66.5) > 1.55) {
    +		    fail("Selected thresholds aren't what they should be.")
    +		}
    +  }
    +    
    +}
    --- End diff --
    
    Add a newline at the end.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-54694750
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10948283
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +object Utils {
    +
    +  implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal {
    +
    +    def apply(indexes: Seq[Int]): Option[Seq[T]] = {
    +      if (indexes.length == 0) {
    +        None
    +      } else {
    +        Some(indexes.map(i => seq(i)))
    +      }
    +    }
    +  }
    +
    +  def sumFreqMaps[A](map1: Map[A, Int],
    +      map2: Map[A, Int]) = {
    +    if (map1 isEmpty) {
    +      map2
    +    } else if (map2 isEmpty) {
    +      map1
    +    } else {
    +      Map.empty[A, Int] ++
    +        (for ((y1, x1) <- map1; (y2, x2) <- map2 if (y1 == y2))
    --- End diff --
    
    Same O(n^2) complexity problem here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38472415
  
    One or more automated tests failed
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13400/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898285
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala ---
    @@ -0,0 +1,60 @@
    +package org.apache.spark.mllib.discretization
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.util.Random
    +import org.apache.spark.mllib.util.InfoTheory
    --- End diff --
    
    imports should be organized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by leizongxiong <gi...@git.apache.org>.
Github user leizongxiong commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-61396457
  
    does the branch can be published with spark 1.2.0 version @mengxr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947062
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * DiscretizerModel provides a template with the basic methods for future discretizers.
    + */
    +trait DiscretizerModel extends Serializable {
    +  /**
    +   * Return the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    --- End diff --
    
    Saying `the number of the feature` is not common. Should change to "the index of the feature" and also change the argument name below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by leizongxiong <gi...@git.apache.org>.
Github user leizongxiong commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r19710343
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +
    +/**
    + * This class contains methods to calculate thresholds to discretize continuous values with the
    + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued
    + * Attributes (1993).
    + *
    + * @param continuousFeaturesIndexes Indexes of features to be discretized.
    + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition.
    + * @param maxBins Maximum number of bins for each discretized feature.
    + */
    +class EntropyMinimizationDiscretizer private (
    +    val continuousFeaturesIndexes: Seq[Int],
    +    val elementsPerPartition: Int,
    +    val maxBins: Int)
    +  extends Serializable {
    +
    +  private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +  private val log2 = { x: Double => math.log(x) / math.log(2) }
    +
    +  /**
    +   * Run the algorithm with the configured parameters on an input.
    +   * @param data RDD of LabeledPoint's.
    +   * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize.
    +   */
    +  def run(data: RDD[LabeledPoint]) = {
    +    val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap)
    +    val nLabels = labels2Int.value.size
    +
    +    var thresholds = Map.empty[Int, Seq[Double]]
    +    for (i <- continuousFeaturesIndexes) {
    +      val featureValues = data.map({
    +        case LabeledPoint(label, values) => (values(i), labels2Int.value(label))
    +      })
    +      val sortedValues = featureValues.sortByKey()
    +      val initialCandidates = initialThresholds(sortedValues, nLabels)
    +      val thresholdsForFeature = this.getThresholds(initialCandidates, nLabels)
    +      thresholds += ((i, thresholdsForFeature))
    +    }
    +
    +    new EntropyMinimizationDiscretizerModel(thresholds)
    +
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs.
    +   * @param nLabels Number of distinct labels in the dataset.
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs.
    +   */
    +  private def initialThresholds(data: RDD[(Double, Int)], nLabels: Int) = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = Int.MinValue
    +      var result = Seq.empty[(Double, Array[Long])]
    +      var freqs = Array.fill[Long](nLabels)(0L)
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = Array.fill[Long](nLabels)(0L)
    +          freqs(y) = 1L
    +        } else {
    +          // we continue on the same interval
    +          freqs(y) += 1
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = {
    +
    +    // Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    // Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh, nLabels) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Array[Long])],
    +      lastSelected : Option[Double],
    +      nLabels: Int) = {
    +
    +    val numPartitions = candidates.partitions.size
    +
    +    val sc = candidates.sparkContext
    +
    +    // store total frequencies for each partition
    +    val totals = sc.runJob(candidates, { case it =>
    +      val accum = Array.fill(nLabels)(0L)
    +      for ((_, freqs) <- it) {
    +        for (i <- 0 until nLabels) accum(i) += freqs(i)
    +      }
    +      accum
    +    }: (Iterator[(Double, Array[Long])]) => Array[Long])
    +
    +    val bcTotals = sc.broadcast(totals)
    +
    +    val result =
    +      candidates.mapPartitionsWithIndex({ (slice, it) =>
    +
    +        // accumulate freqs from left to right
    +        val leftTotal = Array.fill(nLabels)(0L)
    +        for (i <- 0 until slice) {
    +          for (j <- 0 until nLabels) leftTotal(j) += bcTotals.value(i)(j)
    +        }
    +
    +        var leftAccum = Seq.empty[(Double, Array[Long], Array[Long])]
    +
    +        for ((cand, freqs) <- it) {
    +          for (i <- 0 until nLabels) leftTotal(i) += freqs(i)
    +          leftAccum = (cand, freqs, leftTotal.clone) +: leftAccum
    +        }
    +
    +        // accumulate freqs from right to left
    +        val rightTotal = Array.fill(nLabels)(0L)
    +        for (i <- slice + 1 until numPartitions) {
    +          for (j <- 0 until nLabels) leftTotal(j) += bcTotals.value(i)(j)
    --- End diff --
    
    leftTotal(j) += bcTotals.value(i)(j)? I think it should be  rightTotal(j) += bcTotals.value(i)(j) ? @LIDIAgroup 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10951916
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    --- End diff --
    
    Why do we need conversion to String?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10951390
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +object Utils {
    +
    +  implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal {
    +
    +    def apply(indexes: Seq[Int]): Option[Seq[T]] = {
    +      if (indexes.length == 0) {
    +        None
    +      } else {
    +        Some(indexes.map(i => seq(i)))
    +      }
    +    }
    +  }
    +
    +  def sumFreqMaps[A](map1: Map[A, Int],
    +      map2: Map[A, Int]) = {
    +    if (map1 isEmpty) {
    +      map2
    +    } else if (map2 isEmpty) {
    +      map1
    +    } else {
    +      Map.empty[A, Int] ++
    +        (for ((y1, x1) <- map1; (y2, x2) <- map2 if (y1 == y2))
    +          yield ((y1, x1 + x2))) ++
    +        (for (y <- (map1.keySet diff map2.keySet))
    +          yield ((y, map1(y)))) ++
    +        (for (y <- (map2.keySet diff map1.keySet))
    +          yield ((y, map2(y))))
    +    }
    +  }
    +
    +  @inline def log2(x: Double) = {
    --- End diff --
    
    Move this function to `EntropyMinimizationDiscretizer` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38988486
  
    @LIDIAgroup Thanks for the update! The new code didn't pass the style check. Please run `sbt/sbt scalastyle` to see the error messages! I saw the following from Travis log and there might exist others:
    
    ~~~
    error file=/home/travis/build/apache/spark/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala message=insert.a.single.space.after.comment.start.message line=109 column=4
    error file=/home/travis/build/apache/spark/mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala message=insert.a.single.space.after.comment.start.message line=112 column=4
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16054011
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    +    }
    +  }
    +}
    +
    +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +  test("EMD discretization") {
    +    val rnd = new Random(13)
    +        
    +    val data = for (i <- 1 to 99) yield
    +      if (i <= 33) {
    +        LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    --- End diff --
    
    Vectors.dense(Array...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16053868
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * This class provides the methods to discretize data with the given thresholds.
    + * @param thresholds Thresholds used to discretize.
    + */
    +class EntropyMinimizationDiscretizerModel (val thresholds: Map[Int, Seq[Double]])
    +  extends DiscretizerModel[LabeledPoint] with Serializable {
    +
    +  /**
    +   * Discretizes values for the given data set using the model trained.
    +   *
    +   * @param data Data point to discretize.
    +   * @return Data point with values discretized
    +   */
    +  override def discretize(data: LabeledPoint): LabeledPoint = {
    +    val newValues = data.features.zipWithIndex.map({ case (value, i) =>
    +      if (this.thresholds.keySet contains i) {
    +        assignDiscreteValue(value, thresholds(i))
    +      } else {
    +        value
    +      }
    +    })
    +    LabeledPoint(data.label, newValues)
    --- End diff --
    
    newValues -->> Vectors.dense(newValues)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947573
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Map[String, Int])],
    +      lastSelected : Option[Double]) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Seq.empty[Int], Seq.empty[Int])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bc_numPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +    // stores accumulated freqs from right to left
    +    val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2r_i =>
    +
    +      val bc_l_total = l_total.value
    +      val bc_r_total = r_total.value
    +
    +      result =
    +        result.mapPartitionsWithIndex({ (slice, it) =>
    +
    +          val l2r = slice == l2r_i
    +          val r2l = slice == bc_numPartitions.value - 1 - l2r_i
    +
    +          if (l2r && r2l) {
    +
    +            // accumulate both from left to right and right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +
    +            val r2lIt = partialResult.iterator
    +            partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            accum = Map.empty[String, Int]
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else if (l2r) {
    +
    +            // accumulate freqs from left to right
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +            partialResult.reverseIterator
    +
    +          } else if (r2l) {
    +
    +            // accumulate freqs from right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +            val r2lIt = it.toSeq.reverseIterator
    +
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else {
    +            // do nothing in this iteration
    +            it
    +          }
    +        }, true) // important to maintain partitions within the loop
    +        .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations
    +
    +      result.foreachPartition({ _ => }) // Forces the iteration to be calculated
    +    }
    +
    +    // calculate h(S)
    +    // s: number of elements
    +    // k: number of distinct classes
    +    // hs: entropy
    +
    +    val s  = l_total.value.values.reduce(_ + _)
    +    val hs = InfoTheory.entropy(l_total.value.values.toSeq, s)
    +    val k  = l_total.value.values.size
    +
    +    // select best threshold according to the criteria
    +    val final_candidates =
    +      result.flatMap({
    +        case (cand, _, l_freqs, r_freqs) =>
    +
    +          val k1  = l_freqs.size
    +          val s1  = if (k1 > 0) l_freqs.reduce(_ + _) else 0
    +          val hs1 = InfoTheory.entropy(l_freqs, s1)
    +
    +          val k2  = r_freqs.size
    +          val s2  = if (k2 > 0) r_freqs.reduce(_ + _) else 0
    +          val hs2 = InfoTheory.entropy(r_freqs, s2)
    +
    +          val weighted_hs = (s1 * hs1 + s2 * hs2) / s
    +          val gain        = hs - weighted_hs
    +          val delta       = Utils.log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2)
    +          var criterion   = (gain - (Utils.log2(s - 1) + delta) / s) > -1e-5
    +
    +          lastSelected match {
    +              case None =>
    +              case Some(last) => criterion = criterion && (cand != last)
    +          }
    +
    +          if (criterion) {
    +            Seq((weighted_hs, cand))
    +          } else {
    +            Seq.empty[(Double, Double)]
    +          }
    +      })
    +
    +    // choose best candidates and partition data to make recursive calls
    +    if (final_candidates.count > 0) {
    +      val selected_threshold = final_candidates.reduce({ case ((whs1, cand1), (whs2, cand2)) =>
    +        if (whs1 < whs2) (whs1, cand1) else (whs2, cand2)
    +      })._2;
    +      Some(selected_threshold)
    +    } else {
    +      None
    +    }
    +
    +  }
    +
    +  /**
    +   * Discretizes a value with a set of intervals.
    +   *
    +   * @param value The value to be discretized
    +   * @param thresholds Thresholds used to asign a discrete value
    +   */
    +  private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = {
    +    var aux = thresholds.zipWithIndex
    +    while (value > aux.head._1) aux = aux.tail
    +    aux.head._2
    +  }
    +
    +  /**
    +   * Discretizes an RDD of (label, array of values) pairs.
    +   */
    +  def discretize: RDD[LabeledPoint] = {
    +    // calculate thresholds that aren't already calculated
    +    getThresholdsForContinuousFeatures
    +
    +    val bc_thresholds = this.data.context.broadcast(this.thresholds)
    +
    +    // applies thresholds to discretize every continuous feature
    +    data.map {
    +      case LabeledPoint(label, values) =>
    +        LabeledPoint(label,
    +          values.zipWithIndex map {
    +            case (value, i) =>
    +              if (bc_thresholds.value.keySet contains i) {
    +                assignDiscreteValue(value, bc_thresholds.value(i))
    +              } else {
    +                value
    +              }
    +          })
    +    }
    +  }
    +
    --- End diff --
    
    Remove empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16053983
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    --- End diff --
    
    Vectors.dense(Array.fill....)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10946987
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * DiscretizerModel provides a template with the basic methods for future discretizers.
    + */
    +trait DiscretizerModel extends Serializable {
    +  /**
    +   * Return the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double]
    --- End diff --
    
    Better change to argument name to `featureIndex`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11019751
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    --- End diff --
    
    This tries to avoid having lots of partitions for just a few items. Since the nature of the algorithm is analyzing subsets of the initial candidates set, each time the RDD has fewer elements. If you don't coalesce the RDD you end up with an RDD with as many blocks as the original data and very few items.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898309
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala ---
    @@ -0,0 +1,60 @@
    +package org.apache.spark.mllib.discretization
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.util.Random
    +import org.apache.spark.mllib.util.InfoTheory
    +
    +object EMDDiscretizerSuite {
    +    val nFeatures = 5
    +    val nDatapoints = 50
    +    val nLabels = 3
    +    val nPartitions = 3
    +	    
    +	def generateLabeledData : Array[LabeledPoint] =
    +	{
    +        
    +	    val rnd = new Random(42)
    +	    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +	    	    
    +	    Array.fill[LabeledPoint](nDatapoints) {
    +	        LabeledPoint(labels(rnd.nextInt(nLabels)),
    +	                     Array.fill[Double](nFeatures)(rnd.nextDouble))
    +	    } 
    +	}
    +}
    +
    +class EMDDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +    test("EMD discretization") {
    +        val rnd = new Random()
    --- End diff --
    
    Use a fixed seed to make the test deterministic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947517
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Map[String, Int])],
    +      lastSelected : Option[Double]) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Seq.empty[Int], Seq.empty[Int])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bc_numPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +    // stores accumulated freqs from right to left
    +    val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2r_i =>
    +
    +      val bc_l_total = l_total.value
    +      val bc_r_total = r_total.value
    +
    +      result =
    +        result.mapPartitionsWithIndex({ (slice, it) =>
    +
    +          val l2r = slice == l2r_i
    +          val r2l = slice == bc_numPartitions.value - 1 - l2r_i
    +
    +          if (l2r && r2l) {
    +
    +            // accumulate both from left to right and right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +
    +            val r2lIt = partialResult.iterator
    +            partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            accum = Map.empty[String, Int]
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else if (l2r) {
    +
    +            // accumulate freqs from left to right
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +            partialResult.reverseIterator
    +
    +          } else if (r2l) {
    +
    +            // accumulate freqs from right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +            val r2lIt = it.toSeq.reverseIterator
    +
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else {
    +            // do nothing in this iteration
    +            it
    +          }
    +        }, true) // important to maintain partitions within the loop
    +        .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations
    +
    +      result.foreachPartition({ _ => }) // Forces the iteration to be calculated
    +    }
    +
    +    // calculate h(S)
    +    // s: number of elements
    +    // k: number of distinct classes
    +    // hs: entropy
    +
    +    val s  = l_total.value.values.reduce(_ + _)
    +    val hs = InfoTheory.entropy(l_total.value.values.toSeq, s)
    +    val k  = l_total.value.values.size
    +
    +    // select best threshold according to the criteria
    +    val final_candidates =
    +      result.flatMap({
    +        case (cand, _, l_freqs, r_freqs) =>
    +
    +          val k1  = l_freqs.size
    +          val s1  = if (k1 > 0) l_freqs.reduce(_ + _) else 0
    +          val hs1 = InfoTheory.entropy(l_freqs, s1)
    +
    +          val k2  = r_freqs.size
    +          val s2  = if (k2 > 0) r_freqs.reduce(_ + _) else 0
    +          val hs2 = InfoTheory.entropy(r_freqs, s2)
    +
    +          val weighted_hs = (s1 * hs1 + s2 * hs2) / s
    +          val gain        = hs - weighted_hs
    +          val delta       = Utils.log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2)
    +          var criterion   = (gain - (Utils.log2(s - 1) + delta) / s) > -1e-5
    +
    +          lastSelected match {
    +              case None =>
    +              case Some(last) => criterion = criterion && (cand != last)
    +          }
    +
    +          if (criterion) {
    +            Seq((weighted_hs, cand))
    +          } else {
    +            Seq.empty[(Double, Double)]
    +          }
    +      })
    +
    +    // choose best candidates and partition data to make recursive calls
    +    if (final_candidates.count > 0) {
    +      val selected_threshold = final_candidates.reduce({ case ((whs1, cand1), (whs2, cand2)) =>
    +        if (whs1 < whs2) (whs1, cand1) else (whs2, cand2)
    +      })._2;
    +      Some(selected_threshold)
    +    } else {
    +      None
    +    }
    +
    +  }
    +
    +  /**
    +   * Discretizes a value with a set of intervals.
    +   *
    +   * @param value The value to be discretized
    +   * @param thresholds Thresholds used to asign a discrete value
    +   */
    +  private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = {
    +    var aux = thresholds.zipWithIndex
    +    while (value > aux.head._1) aux = aux.tail
    +    aux.head._2
    +  }
    +
    +  /**
    +   * Discretizes an RDD of (label, array of values) pairs.
    +   */
    +  def discretize: RDD[LabeledPoint] = {
    +    // calculate thresholds that aren't already calculated
    +    getThresholdsForContinuousFeatures
    +
    +    val bc_thresholds = this.data.context.broadcast(this.thresholds)
    +
    +    // applies thresholds to discretize every continuous feature
    +    data.map {
    +      case LabeledPoint(label, values) =>
    --- End diff --
    
    move this line to the one above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947993
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +private[discretization] object MapAccumulator extends AccumulatorParam[Map[String, Int]] {
    +
    +  def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = {
    +
    +    if (map1 isEmpty) {
    +      map2
    --- End diff --
    
    If `map2` is returned, it might get modified in the next `addInPlace`. The super method suggests only modifying and returning the first. To we better copy everything to `map1` from `map2` in this case, basically, remove this if block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11168845
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +
    +/**
    + * This class contains methods to calculate thresholds to discretize continuous values with the
    + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued
    + * Attributes (1993).
    + *
    + * @param continuousFeaturesIndexes Indexes of features to be discretized.
    + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition.
    + * @param maxBins Maximum number of bins for each discretized feature.
    + */
    +class EntropyMinimizationDiscretizer private (
    +    val continuousFeaturesIndexes: Seq[Int],
    +    val elementsPerPartition: Int,
    +    val maxBins: Int)
    +  extends Serializable {
    +
    +  private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +  private val log2 = { x: Double => math.log(x) / math.log(2) }
    +
    +  /**
    +   * Run the algorithm with the configured parameters on an input.
    +   * @param data RDD of LabeledPoint's.
    +   * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize.
    +   */
    +  def run(data: RDD[LabeledPoint]) = {
    +    val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap)
    +    val nLabels = labels2Int.value.size
    +
    +    var thresholds = Map.empty[Int, Seq[Double]]
    +    for (i <- continuousFeaturesIndexes) {
    +      val featureValues = data.map({
    +        case LabeledPoint(label, values) => (values(i), labels2Int.value(label))
    +      })
    +      val sortedValues = featureValues.sortByKey()
    +      val initialCandidates = initialThresholds(sortedValues, nLabels)
    +      val thresholdsForFeature = this.getThresholds(initialCandidates, nLabels)
    +      thresholds += ((i, thresholdsForFeature))
    +    }
    +
    +    new EntropyMinimizationDiscretizerModel(thresholds)
    +
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs.
    +   * @param nLabels Number of distinct labels in the dataset.
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs.
    +   */
    +  private def initialThresholds(data: RDD[(Double, Int)], nLabels: Int) = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = Int.MinValue
    +      var result = Seq.empty[(Double, Array[Long])]
    +      var freqs = Array.fill[Long](nLabels)(0L)
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = Array.fill[Long](nLabels)(0L)
    +          freqs(y) = 1L
    +        } else {
    +          // we continue on the same interval
    +          freqs(y) += 1
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = {
    +
    +    // Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    // Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh, nLabels) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Array[Long])],
    +      lastSelected : Option[Double],
    +      nLabels: Int) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Array.empty[Long], Array.empty[Long])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bcNumPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val bcLeftTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator)
    +    // stores accumulated freqs from right to left
    +    val bcRightTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2rIndex =>
    +
    +      val leftTotal = bcLeftTotal.value
    +      val rightTotal = bcRightTotal.value
    +
    +      result =
    +        result.mapPartitionsWithIndex({ (slice, it) =>
    +
    +          val l2r = slice == l2rIndex
    +          val r2l = slice == bcNumPartitions.value - 1 - l2rIndex
    +
    +          if (l2r && r2l) {
    +
    +            // accumulate both from left to right and right to left
    +            var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])]
    +            var accum = leftTotal
    +
    +            for ((cand, freqs, _, rightFreqs) <- it) {
    +              for (i <- 0 until nLabels) accum(i) += freqs(i)
    +              partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult
    +            }
    +
    +            bcLeftTotal += accum
    +
    +            val r2lIt = partialResult.iterator
    +            partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])]
    +            accum = Array.fill[Long](nLabels)(0L)
    +
    +            for ((cand, freqs, leftFreqs, _) <- r2lIt) {
    +              partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult
    +              for (i <- 0 until nLabels) accum(i) += freqs(i)
    +            }
    +
    +            bcRightTotal += accum
    +
    +            partialResult.iterator
    +
    +          } else if (l2r) {
    +
    +            // accumulate freqs from left to right
    +            var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])]
    +            val accum = leftTotal
    +
    +            for ((cand, freqs, _, rightFreqs) <- it) {
    +              for (i <- 0 until nLabels) accum(i) += freqs(i)
    +              partialResult = (cand, freqs, accum.clone, rightFreqs) +: partialResult
    +            }
    +
    +            bcLeftTotal += accum
    +            partialResult.reverseIterator
    +
    +          } else if (r2l) {
    +
    +            // accumulate freqs from right to left
    +            val r2lIt = it.toSeq.reverseIterator
    +
    +            var partialResult = Seq.empty[(Double, Array[Long], Array[Long], Array[Long])]
    +            val accum = rightTotal
    +
    +            for ((cand, freqs, leftFreqs, _) <- r2lIt) {
    +              partialResult = (cand, freqs, leftFreqs, accum.clone) +: partialResult
    +              for (i <- 0 until nLabels) accum(i) += freqs(i)
    +            }
    +
    +            bcRightTotal += accum
    +
    +            partialResult.iterator
    +
    +          } else {
    +            // do nothing in this iteration
    +            it
    +          }
    +        }, true) // important to maintain partitions within the loop
    +        .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations
    +
    +      result.foreachPartition({ _ => }) // Forces the iteration to be calculated
    +
    +    }
    +
    +    // calculate h(S)
    +    // s: number of elements
    +    // k: number of distinct classes
    +    // hs: entropy
    +
    +    val s  = bcLeftTotal.value.reduce(_ + _)
    +    val hs = InfoTheory.entropy(bcLeftTotal.value.toSeq, s)
    +    val k  = bcLeftTotal.value.filter(_ != 0).size
    +
    +    // select best threshold according to the criteria
    +    val finalCandidates =
    +      result.flatMap({
    +        case (cand, _, leftFreqs, rightFreqs) =>
    +
    +          val k1  = leftFreqs.filter(_ != 0).size
    +          val s1  = if (k1 > 0) leftFreqs.reduce(_ + _) else 0
    +          val hs1 = InfoTheory.entropy(leftFreqs, s1)
    +
    +          val k2  = rightFreqs.filter(_ != 0).size
    +          val s2  = if (k2 > 0) rightFreqs.reduce(_ + _) else 0
    +          val hs2 = InfoTheory.entropy(rightFreqs, s2)
    +
    +          val weightedHs = (s1 * hs1 + s2 * hs2) / s
    +          val gain        = hs - weightedHs
    +          val delta       = log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2)
    --- End diff --
    
    `^` means binary XOR in Scala. I assume that you need `math.pow` here. But if I'm wrong, please put a comment before this line and explain the bitwise operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38607525
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-81983258
  
    @LIDIAgroup Sorry that I don't have enough bandwidth to review this PR. Since there are unresolved performance issues, do you mind closing this PR for now? I recommend registering your implementation as a package on spark-packages.org, so users can use it and send feedback to you. Thanks for contributing to MLlib!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11167980
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/ArrayAccumulator.scala ---
    @@ -0,0 +1,32 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +private[discretization] object ArrayAccumulator extends AccumulatorParam[Array[Long]] {
    +
    +  def addInPlace(array1: Array[Long], array2: Array[Long]): Array[Long] = {
    +    array2.clone
    --- End diff --
    
    `array1` is dropped and a copy is `array2` is returned. Is it what you want?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16053704
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +
    +/**
    + * This class contains methods to calculate thresholds to discretize continuous values with the
    + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued
    + * Attributes (1993).
    + *
    + * @param continuousFeaturesIndexes Indexes of features to be discretized.
    + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition.
    + * @param maxBins Maximum number of bins for each discretized feature.
    + */
    +class EntropyMinimizationDiscretizer private (
    +    val continuousFeaturesIndexes: Seq[Int],
    +    val elementsPerPartition: Int,
    +    val maxBins: Int)
    +  extends Serializable {
    +
    +  private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +  private val log2 = { x: Double => math.log(x) / math.log(2) }
    +
    +  /**
    +   * Run the algorithm with the configured parameters on an input.
    +   * @param data RDD of LabeledPoint's.
    +   * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize.
    +   */
    +  def run(data: RDD[LabeledPoint]) = {
    +    val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap)
    +    val nLabels = labels2Int.value.size
    +
    +    var thresholds = Map.empty[Int, Seq[Double]]
    +    for (i <- continuousFeaturesIndexes) {
    +      val featureValues = data.map({
    +        case LabeledPoint(label, values) => (values(i), labels2Int.value(label))
    --- End diff --
    
    values(i) -->> values.toArray(i)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10941658
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    --- End diff --
    
    A discretizer may be trained from one RDD but applied to another. I would recommend separating `Discretizer` from data by having a `train` method that takes an RDD as input and a `discretize` method to discretize an RDD of the same element type/dimension.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898273
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala ---
    @@ -0,0 +1,60 @@
    +package org.apache.spark.mllib.discretization
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.util.Random
    +import org.apache.spark.mllib.util.InfoTheory
    +
    +object EMDDiscretizerSuite {
    +    val nFeatures = 5
    --- End diff --
    
    Please fix indentation in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11168529
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,317 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +
    +/**
    + * This class contains methods to calculate thresholds to discretize continuous values with the
    + * method proposed by Fayyad and Irani in Multi-Interval Discretization of Continuous-Valued
    + * Attributes (1993).
    + *
    + * @param continuousFeaturesIndexes Indexes of features to be discretized.
    + * @param elementsPerPartition Maximum number of thresholds to treat in each RDD partition.
    + * @param maxBins Maximum number of bins for each discretized feature.
    + */
    +class EntropyMinimizationDiscretizer private (
    +    val continuousFeaturesIndexes: Seq[Int],
    +    val elementsPerPartition: Int,
    +    val maxBins: Int)
    +  extends Serializable {
    +
    +  private val partitions = { x: Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +  private val log2 = { x: Double => math.log(x) / math.log(2) }
    +
    +  /**
    +   * Run the algorithm with the configured parameters on an input.
    +   * @param data RDD of LabeledPoint's.
    +   * @return A EntropyMinimizationDiscretizerModel with the thresholds to discretize.
    +   */
    +  def run(data: RDD[LabeledPoint]) = {
    +    val labels2Int = data.context.broadcast(data.map(_.label).distinct.collect.zipWithIndex.toMap)
    +    val nLabels = labels2Int.value.size
    +
    +    var thresholds = Map.empty[Int, Seq[Double]]
    +    for (i <- continuousFeaturesIndexes) {
    +      val featureValues = data.map({
    +        case LabeledPoint(label, values) => (values(i), labels2Int.value(label))
    +      })
    +      val sortedValues = featureValues.sortByKey()
    +      val initialCandidates = initialThresholds(sortedValues, nLabels)
    +      val thresholdsForFeature = this.getThresholds(initialCandidates, nLabels)
    +      thresholds += ((i, thresholdsForFeature))
    +    }
    +
    +    new EntropyMinimizationDiscretizerModel(thresholds)
    +
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs.
    +   * @param nLabels Number of distinct labels in the dataset.
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs.
    +   */
    +  private def initialThresholds(data: RDD[(Double, Int)], nLabels: Int) = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = Int.MinValue
    +      var result = Seq.empty[(Double, Array[Long])]
    +      var freqs = Array.fill[Long](nLabels)(0L)
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = Array.fill[Long](nLabels)(0L)
    +          freqs(y) = 1L
    +        } else {
    +          // we continue on the same interval
    +          freqs(y) += 1
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Array[Long])], nLabels: Int): Seq[Double] = {
    +
    +    // Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    // Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh, nLabels) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Array[Long])],
    +      lastSelected : Option[Double],
    +      nLabels: Int) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Array.empty[Long], Array.empty[Long])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bcNumPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val bcLeftTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator)
    +    // stores accumulated freqs from right to left
    +    val bcRightTotal = candidates.context.accumulator(Array.fill(nLabels)(0L))(ArrayAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2rIndex =>
    --- End diff --
    
    This block is still quite complicated. Let's say you have partitioned frequency sequence
    ~~~
    1 2 1 3 || 1 1 1 1 || 1 2 3 2
    ~~~
    From left to right, you want
    ~~~
    1 3 4 7 || 8 9 10 11 || 12 14 17 19
    ~~~
    From right to left, you want
    ~~~
    19 18 16 15 || 12 11 10 9 || 8 7 5 2
    ~~~
    This can be done by first computing partition-wise sums:
    ~~~
    7 || 4 || 8
    ~~~
    Then compute cumsums from left to right
    ~~~
    0 || 7 || 11 || 19
    ~~~
    and from right to left
    ~~~
    19 || 12 || 8 || 0
    ~~~
    The global cumsum becomes, from left to right
    ~~~
    0 + cumsum(1, 2, 1, 3) || 7 + cumsum(1, 1, 1, 1) || 11 + cumsum(1, 2, 3, 2)
    ~~~
    from right to left
    ~~~
    19 - cumsum(0, 1, 2, 1) || 12 - cumsum(0, 1, 1, 1) || 8 - cumsum(0, 1, 2, 3)
    ~~~
    There are some details I may miss. But two jobs should be sufficient for this computation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16053912
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerModel.scala ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * This class provides the methods to discretize data with the given thresholds.
    + * @param thresholds Thresholds used to discretize.
    + */
    +class EntropyMinimizationDiscretizerModel (val thresholds: Map[Int, Seq[Double]])
    +  extends DiscretizerModel[LabeledPoint] with Serializable {
    +
    +  /**
    +   * Discretizes values for the given data set using the model trained.
    +   *
    +   * @param data Data point to discretize.
    +   * @return Data point with values discretized
    +   */
    +  override def discretize(data: LabeledPoint): LabeledPoint = {
    +    val newValues = data.features.zipWithIndex.map({ case (value, i) =>
    +      if (this.thresholds.keySet contains i) {
    +        assignDiscreteValue(value, thresholds(i))
    +      } else {
    +        value
    +      }
    +    })
    +    LabeledPoint(data.label, newValues)
    +  }
    +
    +  /**
    +   * Discretizes values for the given data set using the model trained.
    +   *
    +   * @param data RDD representing data points to discretize.
    +   * @return RDD with values discretized
    +   */
    +  override def discretize(data: RDD[LabeledPoint]): RDD[LabeledPoint] = {
    +    val bc_thresholds = data.context.broadcast(this.thresholds)
    +
    +    // applies thresholds to discretize every continuous feature
    +    data.map({ case LabeledPoint(label, values) =>
    +      val newValues = values.zipWithIndex.map({ case (value, i) =>
    +        if (bc_thresholds.value.keySet contains i) {
    +          assignDiscreteValue(value, bc_thresholds.value(i))
    +        } else {
    +          value
    +        }
    +      })
    +      LabeledPoint(label, newValues)
    --- End diff --
    
    Same as above. Also need to import org.apache.spark.mllib.linalg.Vectors


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-51784545
  
    @mengxr I've tested the code on few examples after making it compatible with the current version of `LabeledPoint`. It seems to work and produce results similar to what Weka's discretizer does. However I encountered performance issues: 2000 instances with 1000 features are processed several minutes, 2000 instances with 5000 features are processed more than 10 minutes. Bayesian and SVM classification runs on the same machine in order of seconds. I think, optimization is needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10951721
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +	    
    +	def generateLabeledData : Array[LabeledPoint] = {
    --- End diff --
    
    Fix indentation in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38472286
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10941200
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    --- End diff --
    
    organize imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11065687
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.util
    +
    +import org.apache.spark.mllib.discretization.Utils
    +
    +/**
    + * Object with some Information Theory methods.
    + */
    +object InfoTheory {
    --- End diff --
    
    The `Seq[Int]` passed to `entropy` contains the frequencies of each class of the data set. So at least you are doing some probabilistic inference of the frequencies, they should be integers. But I agree that it would be better for `InfoTheory` to have an own PR. For now, I'm going to place it under `discretization` package, as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10969423
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.util
    +
    +import org.apache.spark.mllib.discretization.Utils
    +
    +/**
    + * Object with some Information Theory methods.
    + */
    +object InfoTheory {
    --- End diff --
    
    I placed it in `utils` because the intention is to implement functions of Information Theory that can be used in any algorithm. For instance, the implemented `entropy` functions work out the entropy for a variable whose values have the frequencies stored in `freqs`. This is a general function that is used in the discretizer, but can be potentially used in more cases.
    
    If you don't agree with this, there'll be no problem to move it to `discretizer` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10951631
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/InfoTheory.scala ---
    @@ -0,0 +1,49 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.util
    +
    +import org.apache.spark.mllib.discretization.Utils
    +
    +/**
    + * Object with some Information Theory methods.
    + */
    +object InfoTheory {
    --- End diff --
    
    Similar concern here. We don't want to expose a public `InfoTheory` object under `mllib.util`. Let's move it to `EMD` and mark it private for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38881855
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by avulanov <gi...@git.apache.org>.
Github user avulanov commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r16054017
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    +    }
    +  }
    +}
    +
    +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +  test("EMD discretization") {
    +    val rnd = new Random(13)
    +        
    +    val data = for (i <- 1 to 99) yield
    +      if (i <= 33) {
    +        LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +      } else if (i <= 66) {
    +        LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/216


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38559061
  
    We've tried to follow all suggestions made by @mengxr. If you feel that we should make any other change, please don't hesitate to tell us, we're are willing to discuss it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898196
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala ---
    @@ -0,0 +1,53 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements. See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License. You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +object MapAccumulator extends AccumulatorParam[Map[String, Int]] {
    +
    +  def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = {
    +    if (map1 isEmpty) {
    +      map2
    +    } else if (map2 isEmpty) {
    +      map1
    +    } else {
    +      var result = Map.empty[String, Int]
    +      for ((y1, x1) <- map1; (y2, x2) <- map2) {
    +        if (y1.trim() == y2.trim()) {
    --- End diff --
    
    This has O(n^2) complexity. If the purpose is to merge counters. You should trim keys before adding them into the counter. And then merge another counter in-place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947120
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    --- End diff --
    
    camelCase for variable names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38472118
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10953908
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Map[String, Int])],
    +      lastSelected : Option[Double]) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Seq.empty[Int], Seq.empty[Int])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bc_numPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +    // stores accumulated freqs from right to left
    +    val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2r_i =>
    --- End diff --
    
    The implementation here is really hard to understand. It seems to me that you need 
    
    ~~~
    SparkContext#runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U]
    ~~~
    
    to get the sum per partition, then compute partition-wise cumsum on the master, then each partition knows the number to start. Maybe I understand this block wrongly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38592748
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898245
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala ---
    @@ -0,0 +1,60 @@
    +package org.apache.spark.mllib.discretization
    --- End diff --
    
    Need Apache header.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38488886
  
    @LIDIAgroup Thanks for updating https://github.com/apache/incubator-spark/pull/541
    
    There are some style problems that cause test build failure. You can use `dev/run-tests` to run all Jenkins tests locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38622854
  
    @LIDIAgroup , I made one pass through the code. My major concern is the complexity of the algorithm. Could you help answer the following questions?
    
    0. What is the time complexity of the original algorithm if we assume data is local?
    
    1. What is the time complexity of this implementation?
    
    2. How many passes to the data does it need?
    
    3. How much data do you cache/persist?
    
    For code styles, please follow the Spark Code Style Guide and update your code. Major issues are
    
    1. use camelCase for variable names
    
    2. do not use infix except operators
    
    3. 2-space indentation
    
    4. use `map { case .. =>` instead of making a newline for `case .. =>`
    
    For interfaces, we try to make the number of public interfaces minimal because we are going to maintain public APIs. If a method is not intended for end users, please mark it private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10897344
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/DiscretizerModel.scala ---
    @@ -0,0 +1,47 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements. See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License. You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.rdd.RDD
    +
    +trait DiscretizerModel extends Serializable {
    --- End diff --
    
    Need doc for public traits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-39346452
  
    I've followed your suggestion of using `runJob`. Now `evalThresholds` looks way more simple and the result is the same. Thanks for your help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38472285
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38472412
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947486
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    +    var maxBins: Int = Int.MaxValue)
    +  extends DiscretizerModel {
    +
    +  private var thresholds = Map.empty[Int, Seq[Double]]
    +  private val partitions = { x : Long => math.ceil(x.toDouble / elementsPerPartition).toInt }
    +
    +  def this() = this(null, null)
    +
    +  /**
    +   * Sets the RDD[LabeledPoint] to be discretized
    +   *
    +   * @param data RDD[LabeledPoint] to be discretized
    +   */
    +  def setData(data: RDD[LabeledPoint]): EntropyMinimizationDiscretizer = {
    +    this.data = data
    +    this
    +  }
    +
    +  /**
    +   * Sets the indexes of the features to be discretized
    +   *
    +   * @param continuousFeatures Indexes of features to be discretized
    +   */
    +  def setContinuousFeatures(continuousFeatures: Seq[Int]): EntropyMinimizationDiscretizer = {
    +    this.continousFeatures = continuousFeatures
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of elements that a partition should have
    +   *
    +   * @param ratio Maximum number of elements for a partition
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setElementsPerPartition(ratio: Int): EntropyMinimizationDiscretizer = {
    +    this.elementsPerPartition = ratio
    +    this
    +  }
    +
    +  /**
    +   * Sets the maximum number of discrete values
    +   *
    +   * @param maxBins Maximum number of discrete values
    +   * @return The Discretizer with the parameter changed
    +   */
    +  def setMaxBins(maxBins: Int): EntropyMinimizationDiscretizer = {
    +    this.maxBins = maxBins
    +    this
    +  }
    +  
    +  /**
    +   * Returns the thresholds used to discretized the given feature
    +   *
    +   * @param feature The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(feature: Int): Seq[Double] = {
    +    thresholds.get(feature) match {
    +      case Some(a) => a
    +      case None =>
    +        val featureValues = data.map({
    +          case LabeledPoint(label, values) => (values(feature), label.toString.trim)
    +        })
    +        val sortedValues = featureValues.sortByKey()
    +        val initial_candidates = initialThresholds(sortedValues)
    +        val thresholdsForFeature = this.getThresholds(initial_candidates)
    +        this.thresholds += ((feature, thresholdsForFeature))
    +        thresholdsForFeature
    +    }
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the given features
    +   *
    +   * @param features The number of the feature to discretize
    +   */
    +  def getThresholdsForFeature(features: Seq[Int]): Map[Int, Seq[Double]] = {
    +    for (feature <- features diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds.filter({ features.contains(_) })
    +  }
    +
    +  /**
    +   * Returns the thresholds used to discretized the continuous features
    +   */
    +  def getThresholdsForContinuousFeatures: Map[Int, Seq[Double]] = {
    +    for (feature <- continousFeatures diff this.thresholds.keys.toSeq) {
    +      getThresholdsForFeature(feature)
    +    }
    +
    +    this.thresholds
    +  }
    +
    +  /**
    +   * Calculates the initial candidate treholds for a feature
    +   * @param data RDD of (value, label) pairs
    +   * @return RDD of (candidate, class frequencies between last and current candidate) pairs
    +   */
    +  private def initialThresholds(data: RDD[(Double, String)]): RDD[(Double, Map[String,Int])] = {
    +    data.mapPartitions({ it =>
    +      var lastX = Double.NegativeInfinity
    +      var lastY = ""
    +      var result = Seq.empty[(Double, Map[String, Int])]
    +      var freqs = Map.empty[String, Int]
    +
    +      for ((x, y) <- it) {
    +        if (x != lastX && y != lastY && lastX != Double.NegativeInfinity) {
    +          // new candidate and interval
    +          result = ((x + lastX) / 2, freqs) +: result
    +          freqs = freqs.empty + ((y, 1))
    +        } else {
    +          // we continue on the same interval
    +          freqs = freqs.updated(y, freqs.getOrElse(y, 0) + 1)
    +        }
    +        lastX = x
    +        lastY = y
    +      }
    +
    +      // we add last element as a candidate threshold for convenience 
    +      result = (lastX, freqs) +: result
    +
    +      result.reverse.toIterator
    +    }).persist(StorageLevel.MEMORY_AND_DISK)
    +  }
    +  
    +  /**
    +   * Returns a sequence of doubles that define the intervals to make the discretization.
    +   *
    +   * @param candidates RDD of (value, label) pairs
    +   */
    +  private def getThresholds(candidates: RDD[(Double, Map[String,Int])]): Seq[Double] = {
    +
    +    //Create queue
    +    val stack = new mutable.Queue[((Double, Double), Option[Double])]
    +
    +    //Insert first in the stack
    +    stack.enqueue(((Double.NegativeInfinity, Double.PositiveInfinity), None))
    +    var result = Seq(Double.NegativeInfinity)
    +
    +    // While more elements to eval, continue
    +    while(stack.length > 0 && result.size < this.maxBins){
    +
    +      val (bounds, lastThresh) = stack.dequeue
    +
    +      var cands = candidates.filter({ case (th, _) => th > bounds._1 && th <= bounds._2 })
    +      val nCands = cands.count
    +      if (nCands > 0) {
    +        cands = cands.coalesce(partitions(nCands))
    +
    +        evalThresholds(cands, lastThresh) match {
    +          case Some(th) =>
    +            result = th +: result
    +            stack.enqueue(((bounds._1, th), Some(th)))
    +            stack.enqueue(((th, bounds._2), Some(th)))
    +          case None => /* criteria not fulfilled, finish */
    +        }
    +      }
    +    }
    +    (Double.PositiveInfinity +: result).sorted
    +  }
    +
    +  /**
    +   * Selects one final thresholds among the candidates and returns two partitions to recurse
    +   *
    +   * @param candidates RDD of (candidate, class frequencies between last and current candidate)
    +   * @param lastSelected last selected threshold to avoid selecting it again
    +   */
    +  private def evalThresholds(
    +      candidates: RDD[(Double, Map[String, Int])],
    +      lastSelected : Option[Double]) = {
    +
    +    var result = candidates.map({
    +      case (cand, freqs) =>
    +        (cand, freqs, Seq.empty[Int], Seq.empty[Int])
    +    }).cache
    +
    +    val numPartitions = candidates.partitions.size
    +    val bc_numPartitions = candidates.context.broadcast(numPartitions)
    +
    +    // stores accumulated freqs from left to right
    +    val l_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +    // stores accumulated freqs from right to left
    +    val r_total = candidates.context.accumulator(Map.empty[String, Int])(MapAccumulator)
    +
    +    // calculates accumulated frequencies for each candidate
    +    (0 until numPartitions) foreach { l2r_i =>
    +
    +      val bc_l_total = l_total.value
    +      val bc_r_total = r_total.value
    +
    +      result =
    +        result.mapPartitionsWithIndex({ (slice, it) =>
    +
    +          val l2r = slice == l2r_i
    +          val r2l = slice == bc_numPartitions.value - 1 - l2r_i
    +
    +          if (l2r && r2l) {
    +
    +            // accumulate both from left to right and right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +
    +            val r2lIt = partialResult.iterator
    +            partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            accum = Map.empty[String, Int]
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else if (l2r) {
    +
    +            // accumulate freqs from left to right
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +
    +            for ((cand, freqs, _, r_freqs) <- it) {
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              val l_freqs = Utils.sumFreqMaps(accum, bc_l_total).values.toList
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            l_total += accum
    +            partialResult.reverseIterator
    +
    +          } else if (r2l) {
    +
    +            // accumulate freqs from right to left
    +            var partialResult = Seq.empty[(Double, Map[String, Int], Seq[Int], Seq[Int])]
    +            var accum = Map.empty[String, Int]
    +            val r2lIt = it.toSeq.reverseIterator
    +
    +            for ((cand, freqs, l_freqs, _) <- r2lIt) {
    +              val r_freqs = Utils.sumFreqMaps(accum, bc_r_total).values.toList
    +              accum = Utils.sumFreqMaps(accum, freqs)
    +              partialResult = (cand, freqs, l_freqs, r_freqs) +: partialResult
    +            }
    +
    +            r_total += accum
    +
    +            partialResult.iterator
    +
    +          } else {
    +            // do nothing in this iteration
    +            it
    +          }
    +        }, true) // important to maintain partitions within the loop
    +        .persist(StorageLevel.MEMORY_AND_DISK) // needed, otherwise spark repeats calculations
    +
    +      result.foreachPartition({ _ => }) // Forces the iteration to be calculated
    +    }
    +
    +    // calculate h(S)
    +    // s: number of elements
    +    // k: number of distinct classes
    +    // hs: entropy
    +
    +    val s  = l_total.value.values.reduce(_ + _)
    +    val hs = InfoTheory.entropy(l_total.value.values.toSeq, s)
    +    val k  = l_total.value.values.size
    +
    +    // select best threshold according to the criteria
    +    val final_candidates =
    +      result.flatMap({
    +        case (cand, _, l_freqs, r_freqs) =>
    +
    +          val k1  = l_freqs.size
    +          val s1  = if (k1 > 0) l_freqs.reduce(_ + _) else 0
    +          val hs1 = InfoTheory.entropy(l_freqs, s1)
    +
    +          val k2  = r_freqs.size
    +          val s2  = if (k2 > 0) r_freqs.reduce(_ + _) else 0
    +          val hs2 = InfoTheory.entropy(r_freqs, s2)
    +
    +          val weighted_hs = (s1 * hs1 + s2 * hs2) / s
    +          val gain        = hs - weighted_hs
    +          val delta       = Utils.log2(3 ^ k - 2) - (k * hs - k1 * hs1 - k2 * hs2)
    +          var criterion   = (gain - (Utils.log2(s - 1) + delta) / s) > -1e-5
    +
    +          lastSelected match {
    +              case None =>
    +              case Some(last) => criterion = criterion && (cand != last)
    +          }
    +
    +          if (criterion) {
    +            Seq((weighted_hs, cand))
    +          } else {
    +            Seq.empty[(Double, Double)]
    +          }
    +      })
    +
    +    // choose best candidates and partition data to make recursive calls
    +    if (final_candidates.count > 0) {
    +      val selected_threshold = final_candidates.reduce({ case ((whs1, cand1), (whs2, cand2)) =>
    +        if (whs1 < whs2) (whs1, cand1) else (whs2, cand2)
    +      })._2;
    +      Some(selected_threshold)
    +    } else {
    +      None
    +    }
    +
    +  }
    +
    +  /**
    +   * Discretizes a value with a set of intervals.
    +   *
    +   * @param value The value to be discretized
    +   * @param thresholds Thresholds used to asign a discrete value
    +   */
    +  private def assignDiscreteValue(value: Double, thresholds: Seq[Double]) = {
    +    var aux = thresholds.zipWithIndex
    +    while (value > aux.head._1) aux = aux.tail
    +    aux.head._2
    +  }
    +
    +  /**
    +   * Discretizes an RDD of (label, array of values) pairs.
    +   */
    +  def discretize: RDD[LabeledPoint] = {
    +    // calculate thresholds that aren't already calculated
    +    getThresholdsForContinuousFeatures
    +
    +    val bc_thresholds = this.data.context.broadcast(this.thresholds)
    +
    +    // applies thresholds to discretize every continuous feature
    +    data.map {
    +      case LabeledPoint(label, values) =>
    +        LabeledPoint(label,
    +          values.zipWithIndex map {
    --- End diff --
    
    `zipWithIndex map` to `zipWithIndex.map`. Please check
    https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide
    for infix usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r11167994
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizerSuite.scala ---
    @@ -0,0 +1,71 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.util.Random
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import org.apache.spark.mllib.util.LocalSparkContext
    +
    +object EntropyMinimizationDiscretizerSuite {
    +  val nFeatures = 5
    +  val nDatapoints = 50
    +  val nLabels = 3
    +  val nPartitions = 3
    +
    +  def generateLabeledData: Array[LabeledPoint] = {
    +    val rnd = new Random(42)
    +    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +
    +    Array.fill[LabeledPoint](nDatapoints) {
    +      LabeledPoint(labels(rnd.nextInt(nLabels)),
    +        Array.fill[Double](nFeatures)(rnd.nextDouble))
    +    }
    +  }
    +}
    +
    +class EntropyMinimizationDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +  test("EMD discretization") {
    +    val rnd = new Random(13)
    +		
    +		val data = for (i <- 1 to 99) yield
    --- End diff --
    
    Please fix indentation in this file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10942149
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/EntropyMinimizationDiscretizer.scala ---
    @@ -0,0 +1,402 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import scala.collection.mutable.Stack
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.util.InfoTheory
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.collection.mutable
    +
    +
    +/**
    + * This class contains methods to discretize continuous values with the method proposed in
    + * [Fayyad and Irani, Multi-Interval Discretization of Continuous-Valued Attributes, 1993]
    + */
    +class EntropyMinimizationDiscretizer private (
    +    @transient var data: RDD[LabeledPoint],
    +    @transient var continousFeatures: Seq[Int],
    +    var elementsPerPartition: Int = 18000,
    --- End diff --
    
    Could you explain the reason of setting `18000` as default? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10898397
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/discretization/EMDDiscretizerSuite.scala ---
    @@ -0,0 +1,60 @@
    +package org.apache.spark.mllib.discretization
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.regression.LabeledPoint
    +import scala.util.Random
    +import org.apache.spark.mllib.util.InfoTheory
    +
    +object EMDDiscretizerSuite {
    +    val nFeatures = 5
    +    val nDatapoints = 50
    +    val nLabels = 3
    +    val nPartitions = 3
    +	    
    +	def generateLabeledData : Array[LabeledPoint] =
    +	{
    +        
    +	    val rnd = new Random(42)
    +	    val labels = Array.fill[Double](nLabels)(rnd.nextDouble)
    +	    	    
    +	    Array.fill[LabeledPoint](nDatapoints) {
    +	        LabeledPoint(labels(rnd.nextInt(nLabels)),
    +	                     Array.fill[Double](nFeatures)(rnd.nextDouble))
    +	    } 
    +	}
    +}
    +
    +class EMDDiscretizerSuite extends FunSuite with LocalSparkContext {
    +    
    +    test("EMD discretization") {
    +        val rnd = new Random()
    +		
    +		val data =
    +		for (i <- 1 to 99) yield
    +			if (i <= 33) {
    +			    LabeledPoint(1.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			} else if (i <= 66) {
    +			    LabeledPoint(2.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			} else {
    +			    LabeledPoint(3.0, Array(i.toDouble + rnd.nextDouble*2 - 1))
    +			}
    +		
    +		val shuffledData = data.sortWith((lp1, lp2) => if (rnd.nextDouble < 0.5) true else false)
    --- End diff --
    
    `=> rnd.nextDouble < 0.5` (do not need `if ... else`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38607527
  
    One or more automated tests failed
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13438/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10947586
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala ---
    @@ -0,0 +1,40 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +private[discretization] object MapAccumulator extends AccumulatorParam[Map[String, Int]] {
    +
    +  def addInPlace(map1: Map[String, Int], map2: Map[String, Int]): Map[String, Int] = {
    +
    +    if (map1 isEmpty) {
    --- End diff --
    
    no infix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38944614
  
    I've made some big changes:
    1. Big refactor in the architecture of the discretizer. Now I think it's more coherent with other packages in MLlib.
    2. I've changed the `MapAccumulator` for an `ArrayAccumulator`, which removes the quadratic complexity and improves the overall efficiency of the algorithm.
    3. I've followed @mengxr suggestions and moved `InfoTheory` to the `discretization` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10897825
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/MapAccumulator.scala ---
    @@ -0,0 +1,53 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements. See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License. You may obtain a copy of the License at
    +*
    +* http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.spark.mllib.discretization
    +
    +import org.apache.spark.AccumulatorParam
    +
    +object MapAccumulator extends AccumulatorParam[Map[String, Int]] {
    --- End diff --
    
    Mark it package private if this is not intended to be used by users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by LIDIAgroup <gi...@git.apache.org>.
Github user LIDIAgroup commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38718686
  
    I'll make some changes that, imho, will improve the discretizer in some aspects:
    1. I'll change the accumulator from a `Map` to an `Array`. This implies collecting all different labels and mapping them to sequential `Int`s at the beginning and reversing that mapping at the end. I wanted to avoid this step. But after reading @mengxr complaints about the complexity of `MapAccumulator`, I think it's worthy to do it. 
    2. I propose changing the interface to one `train(data, featureIndexes)` function that works out thresholds to be applied to each feature and stores them and `discretize(data)` that will apply the thresholds to the data given. The possibility of training and discretizing data being different is quite limited, since the indexes of the features to discretize have to be the same. But probably this way is clearer.
    3. My intention with `discretize` returning an `RDD[_]` was that future discretizers could be applied to data other than `LabeledPoint`s (in fact, it is placed under `mllib.regression`, so it doesn't seem to be a standard.) The reason is that labels, unlike the present case,  are not always needed to discretize.
    
    I'm going to work now on point 1, but I'll be pleased to have your opinion about the other points. If you have a different proposal for what the interface of the discretizer should be, I'm willing to discuss it and try to stick to it.
    
    PS: I'm very sorry with my mistakes concerning code conventions. I'll be more cautious on that in the future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38591501
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/216#discussion_r10948233
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/discretization/Utils.scala ---
    @@ -0,0 +1,54 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.discretization
    +
    +object Utils {
    +
    +  implicit class MyRichSeq[T](val seq: Seq[T]) extends AnyVal {
    +
    +    def apply(indexes: Seq[Int]): Option[Seq[T]] = {
    +      if (indexes.length == 0) {
    +        None
    +      } else {
    +        Some(indexes.map(i => seq(i)))
    +      }
    +    }
    +  }
    +
    +  def sumFreqMaps[A](map1: Map[A, Int],
    --- End diff --
    
    If this is only used in `EntropyMinimizationDiscrtizer`, let's move it there and mark it private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-1303] [MLLIB] Added discretization capa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/216#issuecomment-38592747
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---