You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chenxi Zhao (Jira)" <ji...@apache.org> on 2021/03/26 00:09:00 UTC
[jira] [Comment Edited] (SPARK-31754) Spark Structured Streaming:
NullPointerException in Stream Stream join
[ https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309012#comment-17309012 ]
Chenxi Zhao edited comment on SPARK-31754 at 3/26/21, 12:08 AM:
----------------------------------------------------------------
I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from Kafka source loading about 288GB data. After the joining state begins, I immediately start to see such exception:
21/03/24 04:56:51 ERROR Utils: Aborting task
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Edit: It might be related to the corrupted checkpoint. I deleted the checkpoint folder and this particular exception is gone.
was (Author: homezcx):
I got the same issue. I was using Spark 2.4.4 and doing leftouterjoin from Kafka source loading about 288GB data. After the joining state begins, I immediately start to see such exception:
21/03/24 04:56:51 ERROR Utils: Aborting task
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.And_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$10.apply(StreamingSymmetricHashJoinExec.scala:228)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:464)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:217)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:409)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:415)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
> Spark Structured Streaming: NullPointerException in Stream Stream join
> ----------------------------------------------------------------------
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
> Reporter: Puviarasu
> Priority: Major
> Labels: structured-streaming
> Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt, Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting NullPointer Exception after running for few minutes.
> After failure we analyzed the checkpoint offsets/sources and found the files for which the application failed. These files are not having any null values in the join columns.
> We even started the job with the files and the application ran. From this we concluded that the exception is not because of the data from the streams.
> *Code:*
>
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> "1" )
> val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> "1" )
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
> spark.sql("select * from source1 where eventTime1 is not null and col1 is not null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
> spark.sql("select * from source2 where eventTime2 is not null and col2 is not null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
> spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + interval 2 hours").createTempView("join")
> val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
> spark.sql("select * from join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 seconds")).format("parquet").options(optionsMap3).start()
> {code}
>
> *Exception:*
>
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
> at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
> at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
> at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
> at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
> at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
> at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
> at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Blacklisting behavior can be configured via spark.blacklist.*. at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
> at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
> ... 19 moreException in thread "main" org.apache.spark.SparkException: Application application_2345 finished with failed status
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
> at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
> at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org