You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Bettadapura Srinath Sharma (JIRA)" <ji...@apache.org> on 2017/05/19 05:13:04 UTC

[jira] [Updated] (SPARK-20803) KernelDensity.estimate in pyspark.mllib.stat.KernelDensity throws net.razorvine.pickle.PickleException when input data is normally distributed (no error when data is not normally distributed)

     [ https://issues.apache.org/jira/browse/SPARK-20803?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Bettadapura Srinath Sharma updated SPARK-20803:
-----------------------------------------------
    Description: 
When data is NOT normally distributed (correct behavior):
This code:
    	vecRDD = sc.parallelize(colVec)
        kd = KernelDensity()
        kd.setSample(vecRDD)
        kd.setBandwidth(3.0)
        # Find density estimates for the given values
        densities = kd.estimate(samplePoints)
produces:
17/05/18 15:40:36 INFO SparkContext: Starting job: aggregate at KernelDensity.scala:92
17/05/18 15:40:36 INFO DAGScheduler: Got job 21 (aggregate at KernelDensity.scala:92) with 1 output partitions
17/05/18 15:40:36 INFO DAGScheduler: Final stage: ResultStage 24 (aggregate at KernelDensity.scala:92)
17/05/18 15:40:36 INFO DAGScheduler: Parents of final stage: List()
17/05/18 15:40:36 INFO DAGScheduler: Missing parents: List()
17/05/18 15:40:36 INFO DAGScheduler: Submitting ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345), which has no missing parents
17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 6.6 KB, free 413.6 MB)
17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 3.6 KB, free 413.6 MB)
17/05/18 15:40:36 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 192.168.0.115:38645 (size: 3.6 KB, free: 413.9 MB)
17/05/18 15:40:36 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:996
17/05/18 15:40:36 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345)
17/05/18 15:40:36 INFO TaskSchedulerImpl: Adding task set 24.0 with 1 tasks
17/05/18 15:40:36 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 24, localhost, executor driver, partition 0, PROCESS_LOCAL, 96186 bytes)
17/05/18 15:40:36 INFO Executor: Running task 0.0 in stage 24.0 (TID 24)
17/05/18 15:40:37 INFO PythonRunner: Times: total = 66, boot = -1831, init = 1844, finish = 53
17/05/18 15:40:37 INFO Executor: Finished task 0.0 in stage 24.0 (TID 24). 2476 bytes result sent to driver
17/05/18 15:40:37 INFO DAGScheduler: ResultStage 24 (aggregate at KernelDensity.scala:92) finished in 1.001 s
17/05/18 15:40:37 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 24) in 1004 ms on localhost (executor driver) (1/1)
17/05/18 15:40:37 INFO TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool 
17/05/18 15:40:37 INFO DAGScheduler: Job 21 finished: aggregate at KernelDensity.scala:92, took 1.136263 s
17/05/18 15:40:37 INFO BlockManagerInfo: Removed broadcast_25_piece0 on 192.168.0.115:38645 in memory (size: 3.6 KB, free: 413.9 MB)
5.6654703477e-05,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001
,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,

But if Data IS normally distributed:

I see:
17/05/18 15:50:16 ERROR Executor: Exception in task 0.0 in stage 24.0 (TID 24)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

On Scala, the correct result is:
Code:
	vecRDD = sc.parallelize(colVec)
        kd = new KernelDensity().setSample(vecRDD).setBandwidth(3.0)
		
        // Find density estimates for the given values
        densities = kd.estimate(samplePoints)

[0.04113814235801906,1.0994865517293571E-163,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,]

  was:
When data is NOT normally distributed (correct behavior):
This code:
    	vecRDD = sc.parallelize(colVec)
        kd = KernelDensity()
        kd.setSample(vecRDD)
        kd.setBandwidth(3.0)
        # Find density estimates for the given values
        densities = kd.estimate(samplePoints)
produces:
17/05/18 15:40:36 INFO SparkContext: Starting job: aggregate at KernelDensity.scala:92
17/05/18 15:40:36 INFO DAGScheduler: Got job 21 (aggregate at KernelDensity.scala:92) with 1 output partitions
17/05/18 15:40:36 INFO DAGScheduler: Final stage: ResultStage 24 (aggregate at KernelDensity.scala:92)
17/05/18 15:40:36 INFO DAGScheduler: Parents of final stage: List()
17/05/18 15:40:36 INFO DAGScheduler: Missing parents: List()
17/05/18 15:40:36 INFO DAGScheduler: Submitting ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345), which has no missing parents
17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 6.6 KB, free 413.6 MB)
17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 3.6 KB, free 413.6 MB)
17/05/18 15:40:36 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 192.168.0.115:38645 (size: 3.6 KB, free: 413.9 MB)
17/05/18 15:40:36 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:996
17/05/18 15:40:36 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345)
17/05/18 15:40:36 INFO TaskSchedulerImpl: Adding task set 24.0 with 1 tasks
17/05/18 15:40:36 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 24, localhost, executor driver, partition 0, PROCESS_LOCAL, 96186 bytes)
17/05/18 15:40:36 INFO Executor: Running task 0.0 in stage 24.0 (TID 24)
17/05/18 15:40:37 INFO PythonRunner: Times: total = 66, boot = -1831, init = 1844, finish = 53
17/05/18 15:40:37 INFO Executor: Finished task 0.0 in stage 24.0 (TID 24). 2476 bytes result sent to driver
17/05/18 15:40:37 INFO DAGScheduler: ResultStage 24 (aggregate at KernelDensity.scala:92) finished in 1.001 s
17/05/18 15:40:37 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 24) in 1004 ms on localhost (executor driver) (1/1)
17/05/18 15:40:37 INFO TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool 
17/05/18 15:40:37 INFO DAGScheduler: Job 21 finished: aggregate at KernelDensity.scala:92, took 1.136263 s
17/05/18 15:40:37 INFO BlockManagerInfo: Removed broadcast_25_piece0 on 192.168.0.115:38645 in memory (size: 3.6 KB, free: 413.9 MB)
5.6654703477e-05,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001
,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,

But if Data IS normally distributed:

I see:
17/05/18 15:50:16 ERROR Executor: Exception in task 0.0 in stage 24.0 (TID 24)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)



> KernelDensity.estimate in pyspark.mllib.stat.KernelDensity throws net.razorvine.pickle.PickleException when input data is normally distributed (no error when data is not normally distributed)
> -----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-20803
>                 URL: https://issues.apache.org/jira/browse/SPARK-20803
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib, PySpark
>    Affects Versions: 2.1.1
>         Environment: Linux version 4.4.14-smp
> x86/fpu: Legacy x87 FPU detected.
> using command line: 
> bash-4.3$ ./bin/spark-submit ~/work/python/Features.py
> bash-4.3$ pwd
> /home/bsrsharma/spark-2.1.1-bin-hadoop2.7
> export JAVA_HOME=/home/bsrsharma/jdk1.8.0_121
>            Reporter: Bettadapura Srinath Sharma
>
> When data is NOT normally distributed (correct behavior):
> This code:
>     	vecRDD = sc.parallelize(colVec)
>         kd = KernelDensity()
>         kd.setSample(vecRDD)
>         kd.setBandwidth(3.0)
>         # Find density estimates for the given values
>         densities = kd.estimate(samplePoints)
> produces:
> 17/05/18 15:40:36 INFO SparkContext: Starting job: aggregate at KernelDensity.scala:92
> 17/05/18 15:40:36 INFO DAGScheduler: Got job 21 (aggregate at KernelDensity.scala:92) with 1 output partitions
> 17/05/18 15:40:36 INFO DAGScheduler: Final stage: ResultStage 24 (aggregate at KernelDensity.scala:92)
> 17/05/18 15:40:36 INFO DAGScheduler: Parents of final stage: List()
> 17/05/18 15:40:36 INFO DAGScheduler: Missing parents: List()
> 17/05/18 15:40:36 INFO DAGScheduler: Submitting ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345), which has no missing parents
> 17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 6.6 KB, free 413.6 MB)
> 17/05/18 15:40:36 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 3.6 KB, free 413.6 MB)
> 17/05/18 15:40:36 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 192.168.0.115:38645 (size: 3.6 KB, free: 413.9 MB)
> 17/05/18 15:40:36 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:996
> 17/05/18 15:40:36 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 24 (MapPartitionsRDD[44] at mapPartitions at PythonMLLibAPI.scala:1345)
> 17/05/18 15:40:36 INFO TaskSchedulerImpl: Adding task set 24.0 with 1 tasks
> 17/05/18 15:40:36 INFO TaskSetManager: Starting task 0.0 in stage 24.0 (TID 24, localhost, executor driver, partition 0, PROCESS_LOCAL, 96186 bytes)
> 17/05/18 15:40:36 INFO Executor: Running task 0.0 in stage 24.0 (TID 24)
> 17/05/18 15:40:37 INFO PythonRunner: Times: total = 66, boot = -1831, init = 1844, finish = 53
> 17/05/18 15:40:37 INFO Executor: Finished task 0.0 in stage 24.0 (TID 24). 2476 bytes result sent to driver
> 17/05/18 15:40:37 INFO DAGScheduler: ResultStage 24 (aggregate at KernelDensity.scala:92) finished in 1.001 s
> 17/05/18 15:40:37 INFO TaskSetManager: Finished task 0.0 in stage 24.0 (TID 24) in 1004 ms on localhost (executor driver) (1/1)
> 17/05/18 15:40:37 INFO TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool 
> 17/05/18 15:40:37 INFO DAGScheduler: Job 21 finished: aggregate at KernelDensity.scala:92, took 1.136263 s
> 17/05/18 15:40:37 INFO BlockManagerInfo: Removed broadcast_25_piece0 on 192.168.0.115:38645 in memory (size: 3.6 KB, free: 413.9 MB)
> 5.6654703477e-05,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001
> ,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> 0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,0.000100010001,
> But if Data IS normally distributed:
> I see:
> 17/05/18 15:50:16 ERROR Executor: Exception in task 0.0 in stage 24.0 (TID 24)
> net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
> On Scala, the correct result is:
> Code:
> 	vecRDD = sc.parallelize(colVec)
>         kd = new KernelDensity().setSample(vecRDD).setBandwidth(3.0)
> 		
>         // Find density estimates for the given values
>         densities = kd.estimate(samplePoints)
> [0.04113814235801906,1.0994865517293571E-163,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org