You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2017/03/26 19:32:42 UTC

[jira] [Updated] (SPARK-19476) Running threads in Spark DataFrame foreachPartition() causes NullPointerException

     [ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen updated SPARK-19476:
------------------------------
      Priority: Minor  (was: Major)
    Issue Type: Improvement  (was: Bug)

You certainly access the iterator from Spark right? that's what I mean. You might just copy that off into a data structure with no relation to the iterator implementation. That of course may not be feasible.

I think this is true over the whole API, that you're not intended to create your own threads. It could well work in many cases but don't think it's guaranteed to. Sure, maybe worth a note.

> Running threads in Spark DataFrame foreachPartition() causes NullPointerException
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-19476
>                 URL: https://issues.apache.org/jira/browse/SPARK-19476
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Gal Topper
>            Priority: Minor
>
> First reported on [Stack overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me except for when the underlying iterator is TungstenAggregationIterator. Here is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
>     import scala.concurrent.ExecutionContext.Implicits.global
>     import scala.concurrent.duration.Duration
>     import scala.concurrent.{Await, Future}
>     import org.apache.spark.SparkContext
>     import org.apache.spark.sql.SQLContext
>     object Reproduce extends App {
>       val sc = new SparkContext("local", "reproduce")
>       val sqlContext = new SQLContext(sc)
>       import sqlContext.implicits._
>       val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
>       df.foreachPartition { iterator =>
>         val f = Future(iterator.toVector)
>         Await.result(f, Duration.Inf)
>       }
>     }
> {code}
> When I run this, I get:
> {noformat}
>     java.lang.NullPointerException
>         at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
>         at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - TungstenAggregationIterator uses a ThreadLocal variable that returns null when called from a thread other than the original thread that got the iterator from Spark. From examining the code, this does not appear to differ between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not documented, as far as I'm aware.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org