You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sneha Shukla <sn...@gmail.com> on 2016/06/22 04:39:56 UTC

Fwd: 'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1

Hi,

I'm trying to use the BinaryClassificationMetrics class to compute the pr
curve as below -

import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by sneha.shukla on 17/06/16.
  */

object TestCode {

  def main(args: Array[String]): Unit = {

    val sparkConf = new
SparkConf().setAppName("HBaseRead").setMaster("local")

    sparkConf.set("spark.default.parallelism","1")

    sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
    sparkConf.registerKryoClasses(Array(classOf[GenericRecord],
classOf[LabelledData], classOf[Configuration]))

    val sc = new SparkContext(sparkConf)

    val jobConf = new JobConf(sc.hadoopConfiguration)

    val rdd = sc.hadoopFile(
      "sampleData",
      classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
      classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
      classOf[org.apache.hadoop.io.NullWritable],2)

    println("Original Partitions : "+rdd.partitions.size)

    val anotherRDD = rdd.map(row => row._1.datum).map(rowValue =>
rowValue.get("value").toString.split("\\|"))

    println("Another RDD partitions : "+anotherRDD.partitions.size)

    var res = scala.collection.mutable.ListBuffer[(Double, Double)]()

    val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator
=> {
      while (iterator.hasNext) {
        val array = iterator.next
        val iter = array.iterator
        val prediction = iter.next.toDouble
        val label = iter.next.toDouble
        val t = (prediction, label)
        res += t
      }
      res.iterator
    }).map(doubles => (doubles._1, doubles._2))

    println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size)

    //Sample data in yetAnotherRDD
//    (0.0025952152930881676,0.0)
//    (8.08581095750238E-5,0.0)
//    (0.1420529729314534,0.0)
//    (1.287933787473423,0.0)
//    (0.007534799826226573,0.0)
//    (0.008488829931163747,0.0)
//    (1.441921051791096,0.0)
//    (0.0036552783890398343,0.0)
//    (2.3833004789198267,0.0)
//    (0.3695065893117973,0.0)

    //Metrics Calculation. Explicitly setting numBins to 10
    val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10)

    val pr = metrics.pr().collect()

    val thr = metrics.thresholds().collect()

    val joined =
metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect()

    println(joined.size)

    println(thr.size)

    println(pr.size)
  }

}

In the local mode, my local machine as 2 cores, and hence I set the
minPartitions in the original RDD to 2 (based on suggestions here :
http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010
)

However, upon experimenting a bit, it turns out that the numBins property
in BinaryClassificationMetrics class is not honoured in case the
"spark.default.parallelism" property is not set to 1.
AFAIU, the numBins should downsample the input RDD, as documented here :
https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html


 When "spark.default.parallelism" is set to 1, the size of the thesholds
and pr curve is near about the numBins, as documented here
In case I make it 100, the size of the thresholds in the
BinaryClassification class becomes ~100 and so on.

Am I missing something here? In case the dataset on which pr is being
computed is huge, wouldn't setting parallelism to 1 impact performance?

I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1
in cluster mode has a similar results.

Any pointers/help would be appreciated!

Thanks!

Re: 'numBins' property not honoured in BinaryClassificationMetrics class when spark.default.parallelism is not set to 1

Posted by Sean Owen <so...@cloudera.com>.
Why do you say it's not honored -- what do you observe? looking at the
code, it does not seem to depend on the RDD parallelism. Can you
narrow this down to a shorter example?

On Wed, Jun 22, 2016 at 5:39 AM, Sneha Shukla <sn...@gmail.com> wrote:
> Hi,
>
> I'm trying to use the BinaryClassificationMetrics class to compute the pr
> curve as below -
>
> import org.apache.avro.generic.GenericRecord
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.mapred.JobConf
> import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
> import org.apache.spark.{SparkConf, SparkContext}
>
> /**
>   * Created by sneha.shukla on 17/06/16.
>   */
>
> object TestCode {
>
>   def main(args: Array[String]): Unit = {
>
>     val sparkConf = new
> SparkConf().setAppName("HBaseRead").setMaster("local")
>
>     sparkConf.set("spark.default.parallelism","1")
>
>     sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>     sparkConf.registerKryoClasses(Array(classOf[GenericRecord],
> classOf[LabelledData], classOf[Configuration]))
>
>     val sc = new SparkContext(sparkConf)
>
>     val jobConf = new JobConf(sc.hadoopConfiguration)
>
>     val rdd = sc.hadoopFile(
>       "sampleData",
>       classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
>       classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
>       classOf[org.apache.hadoop.io.NullWritable],2)
>
>     println("Original Partitions : "+rdd.partitions.size)
>
>     val anotherRDD = rdd.map(row => row._1.datum).map(rowValue =>
> rowValue.get("value").toString.split("\\|"))
>
>     println("Another RDD partitions : "+anotherRDD.partitions.size)
>
>     var res = scala.collection.mutable.ListBuffer[(Double, Double)]()
>
>     val yetAnotherRDD = anotherRDD.mapPartitions[(Double, Double)](iterator
> => {
>       while (iterator.hasNext) {
>         val array = iterator.next
>         val iter = array.iterator
>         val prediction = iter.next.toDouble
>         val label = iter.next.toDouble
>         val t = (prediction, label)
>         res += t
>       }
>       res.iterator
>     }).map(doubles => (doubles._1, doubles._2))
>
>     println("yet anohter rdd partitions : "+yetAnotherRDD.partitions.size)
>
>     //Sample data in yetAnotherRDD
> //    (0.0025952152930881676,0.0)
> //    (8.08581095750238E-5,0.0)
> //    (0.1420529729314534,0.0)
> //    (1.287933787473423,0.0)
> //    (0.007534799826226573,0.0)
> //    (0.008488829931163747,0.0)
> //    (1.441921051791096,0.0)
> //    (0.0036552783890398343,0.0)
> //    (2.3833004789198267,0.0)
> //    (0.3695065893117973,0.0)
>
>     //Metrics Calculation. Explicitly setting numBins to 10
>     val metrics = new BinaryClassificationMetrics(yetAnotherRDD, 10)
>
>     val pr = metrics.pr().collect()
>
>     val thr = metrics.thresholds().collect()
>
>     val joined =
> metrics.precisionByThreshold().join(metrics.recallByThreshold()).collect()
>
>     println(joined.size)
>
>     println(thr.size)
>
>     println(pr.size)
>   }
>
> }
>
> In the local mode, my local machine as 2 cores, and hence I set the
> minPartitions in the original RDD to 2 (based on suggestions here :
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-when-testing-with-large-sparse-svm-td9592.html#a10010)
>
> However, upon experimenting a bit, it turns out that the numBins property in
> BinaryClassificationMetrics class is not honoured in case the
> "spark.default.parallelism" property is not set to 1.
> AFAIU, the numBins should downsample the input RDD, as documented here :
> https://spark.apache.org/docs/1.6.1/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
>
>  When "spark.default.parallelism" is set to 1, the size of the thesholds and
> pr curve is near about the numBins, as documented here
> In case I make it 100, the size of the thresholds in the
> BinaryClassification class becomes ~100 and so on.
>
> Am I missing something here? In case the dataset on which pr is being
> computed is huge, wouldn't setting parallelism to 1 impact performance?
>
> I am using spark 1.6.1 in local mode for this experiment. Using spark 1.5.1
> in cluster mode has a similar results.
>
> Any pointers/help would be appreciated!
>
> Thanks!
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org