You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chenxi Zhao (Jira)" <ji...@apache.org> on 2021/03/26 00:09:00 UTC

[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming: NullPointerException in Stream Stream join

    [ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309012#comment-17309012 ] 

Chenxi Zhao edited comment on SPARK-31754 at 3/26/21, 12:08 AM:
----------------------------------------------------------------

I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from Kafka source loading about 288GB data. After the joining state begins, I immediately start to see such exception:

 

21/03/24 04:56:51 ERROR Utils: Aborting task
 java.lang.NullPointerException
 at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
 at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
 at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
 at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
 at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

 

Edit: It might be related to the corrupted checkpoint. I deleted the checkpoint folder and this particular exception is gone.


was (Author: homezcx):
I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from Kafka source loading about 288GB data. After the joining state begins, I immediately start to see such exception:

 

21/03/24 04:56:51 ERROR Utils: Aborting task
java.lang.NullPointerException
 at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
 at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
 at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
 at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
 at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
 at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
 at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:123)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

> Spark Structured Streaming: NullPointerException in Stream Stream join
> ----------------------------------------------------------------------
>
>                 Key: SPARK-31754
>                 URL: https://issues.apache.org/jira/browse/SPARK-31754
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>            Reporter: Puviarasu
>            Priority: Major
>              Labels: structured-streaming
>         Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files for which the application failed. These files are not having any null values in the join columns. 
> We even started the job with the files and the application ran. From this we concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> "1" )
>  spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is not null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is not null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
>         at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
>         at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
>         at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
>         at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
>         at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
>         at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
>         at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
>         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
>         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>         at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
>         at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>         at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)
>         at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Blacklisting behavior can be configured via spark.blacklist.*.        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
>         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
>         at scala.Option.foreach(Option.scala:257)
>         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
>         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
>         at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
>         ... 19 moreException in thread "main" org.apache.spark.SparkException: Application application_2345 finished with failed status
>         at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
>         at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
>         at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
>         at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
>         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
>         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>         at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org