You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2016/03/25 20:23:25 UTC

[jira] [Commented] (SPARK-14163) SumEvaluator and countApprox cannot reliably handle RDDs of size 1

    [ https://issues.apache.org/jira/browse/SPARK-14163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212280#comment-15212280 ] 

Sean Owen commented on SPARK-14163:
-----------------------------------

Yeah, if n <= 1, stuff is already messed up here since the variance will be undefined. If n == 0 I suppose you'd have the same case as when 0 outputs have been merged and can return the same result straight away. When n == 1 I suppose the same thing can be returned, but in that case at least sumEstimate is valid.

> SumEvaluator and countApprox cannot reliably handle RDDs of size 1
> ------------------------------------------------------------------
>
>                 Key: SPARK-14163
>                 URL: https://issues.apache.org/jira/browse/SPARK-14163
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.2, 1.6.0, 1.6.1, 2.0.0
>            Reporter: Marcin Tustin
>
> The bug exists in these lines: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala#L59-L61
> In this code
> {code:title=SumEvaluator.scala|borderStyle=solid}
>           val degreesOfFreedom = (counter.count - 1).toInt
>           new TDistribution(degreesOfFreedom).inverseCumulativeProbability(1 - (1 - confidence) / 2)
> {code}
> If {{counter.count}} is 1 or 0 then {{new TDistribution(degreesOfFreedom)}} will raise an exception because {{TDistribution}} expects its {{degreesOfFreedom}} parameter to be 1 or greater.
> An example (written in pyspark):
> {noformat}
> >>> rdd = sc.parallelize([1])
> >>> rdd.countApprox(1000,0.5)
> 16/03/25 18:09:36 INFO SparkContext: Starting job: sumApprox at NativeMethodAccessorImpl.java:-2
> 16/03/25 18:09:36 INFO DAGScheduler: Got job 1 (sumApprox at NativeMethodAccessorImpl.java:-2) with 2 output partitions
> 16/03/25 18:09:36 INFO DAGScheduler: Final stage: ResultStage 1(sumApprox at NativeMethodAccessorImpl.java:-2)
> 16/03/25 18:09:36 INFO DAGScheduler: Parents of final stage: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Missing parents: List()
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147), which has no missing parents
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(4328) called with curMem=7140, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.2 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO MemoryStore: ensureFreeSpace(2821) called with curMem=11468, maxMem=555755765
> 16/03/25 18:09:36 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.8 KB, free 530.0 MB)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 10.5.5.158:56348 (size: 2.8 KB, free: 530.0 MB)
> 16/03/25 18:09:36 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
> 16/03/25 18:09:36 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[6] at mapPartitions at SerDeUtil.scala:147)
> 16/03/25 18:09:36 INFO YarnScheduler: Adding task set 1.0 with 2 tasks
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, r-hadoopeco-data-66215afe.hbinternal.com, PROCESS_LOCAL, 2071 bytes)
> 16/03/25 18:09:36 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, r-hadoopeco-data-84205b1c.hbinternal.com, PROCESS_LOCAL, 2090 bytes)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on r-hadoopeco-data-66215afe.hbinternal.com:43011 (size: 2.8 KB, free: 530.0 MB)
> 16/03/25 18:09:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 66 ms on r-hadoopeco-data-66215afe.hbinternal.com (1/2)
> 16/03/25 18:09:36 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on r-hadoopeco-data-84205b1c.hbinternal.com:41613 (size: 2.8 KB, free: 530.0 MB)
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2227, in countApprox
>     return int(drdd.sumApprox(timeout, confidence))
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/rdd.py", line 2243, in sumApprox
>     r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
>   File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
>   File "/usr/hdp/2.3.4.0-3485/spark/python/pyspark/sql/utils.py", line 36, in deco
>     return f(*a, **kw)
>   File "/usr/hdp/2.3.4.0-3485/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o69.sumApprox.
> : org.apache.commons.math3.exception.NotStrictlyPositiveException: degrees of freedom (0)
> 	at org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:120)
> 	at org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:86)
> 	at org.apache.commons.math3.distribution.TDistribution.<init>(TDistribution.java:63)
> 	at org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:61)
> 	at org.apache.spark.partial.SumEvaluator.currentResult(SumEvaluator.scala:29)
> 	at org.apache.spark.partial.ApproximateActionListener.awaitResult(ApproximateActionListener.scala:79)
> 	at org.apache.spark.scheduler.DAGScheduler.runApproximateJob(DAGScheduler.scala:586)
> 	at org.apache.spark.SparkContext.runApproximateJob(SparkContext.scala:1962)
> 	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:99)
> 	at org.apache.spark.rdd.DoubleRDDFunctions$$anonfun$sumApprox$1.apply(DoubleRDDFunctions.scala:96)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
> 	at org.apache.spark.rdd.DoubleRDDFunctions.sumApprox(DoubleRDDFunctions.scala:96)
> 	at org.apache.spark.api.java.JavaDoubleRDD.sumApprox(JavaDoubleRDD.scala:224)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:497)
> 	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> 	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> 	at py4j.Gateway.invoke(Gateway.java:259)
> 	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> 	at py4j.commands.CallCommand.execute(CallCommand.java:79)
> 	at py4j.GatewayConnection.run(GatewayConnection.java:207)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Note that this only happens occasionally, as befits a probabilistic counting method. A good way to reproduce is:
> {code}
> rdd = sc.parallelize([1]); for x in xrange(1000): rdd.countApprox(1+x,0.5)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org