You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lucy Yu (JIRA)" <ji...@apache.org> on 2017/03/24 20:29:41 UTC
[jira] [Commented] (SPARK-19476) Running threads in Spark DataFrame
foreachPartition() causes NullPointerException
[ https://issues.apache.org/jira/browse/SPARK-19476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941079#comment-15941079 ]
Lucy Yu commented on SPARK-19476:
---------------------------------
I believe we've seen a similar issue raised here:
https://github.com/memsql/memsql-spark-connector/issues/31
It seems that
{code}
sqlDf.foreachPartition(partition => {
new Thread(new Runnable {
override def run(): Unit = {
for (row <- partition) {
// do nothing here, just force the partition to be fully iterated over
}
}
}).start()
})
{code}
results in
{code}
java.lang.NullPointerException
at org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:287)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillMerger$2.loadNext(UnsafeSorterSpillMerger.java:86)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextRow(WindowExec.scala:301)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.fetchNextPartition(WindowExec.scala:361)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:391)
at org.apache.spark.sql.execution.window.WindowExec$$anonfun$14$$anon$1.next(WindowExec.scala:290)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(UnknownSource)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.cainc.data.etl.lake.TestMartOutput$$anonfun$main$3$$anon$1.run(TestMartOutput.scala:42)
at java.lang.Thread.run(Thread.java:745)
{code}
> Running threads in Spark DataFrame foreachPartition() causes NullPointerException
> ---------------------------------------------------------------------------------
>
> Key: SPARK-19476
> URL: https://issues.apache.org/jira/browse/SPARK-19476
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.0, 1.6.1, 1.6.2, 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
> Reporter: Gal Topper
>
> First reported on [Stack overflow|http://stackoverflow.com/questions/41674069/running-threads-in-spark-dataframe-foreachpartition].
> I use multiple threads inside foreachPartition(), which works great for me except for when the underlying iterator is TungstenAggregationIterator. Here is a minimal code snippet to reproduce:
> {code:title=Reproduce.scala|borderStyle=solid}
> import scala.concurrent.ExecutionContext.Implicits.global
> import scala.concurrent.duration.Duration
> import scala.concurrent.{Await, Future}
> import org.apache.spark.SparkContext
> import org.apache.spark.sql.SQLContext
> object Reproduce extends App {
> val sc = new SparkContext("local", "reproduce")
> val sqlContext = new SQLContext(sc)
> import sqlContext.implicits._
> val df = sc.parallelize(Seq(1)).toDF("number").groupBy("number").count()
> df.foreachPartition { iterator =>
> val f = Future(iterator.toVector)
> Await.result(f, Duration.Inf)
> }
> }
> {code}
> When I run this, I get:
> {noformat}
> java.lang.NullPointerException
> at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:751)
> at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.next(TungstenAggregationIterator.scala:84)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> {noformat}
> I believe I actually understand why this happens - TungstenAggregationIterator uses a ThreadLocal variable that returns null when called from a thread other than the original thread that got the iterator from Spark. From examining the code, this does not appear to differ between recent Spark versions.
> However, this limitation is specific to TungstenAggregationIterator, and not documented, as far as I'm aware.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org