You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jim Huang (Jira)" <ji...@apache.org> on 2020/06/15 22:57:00 UTC

[jira] [Updated] (SPARK-31995) Spark Structure Streaming checkpiontFileManager ERROR when HDFS.DFSOutputStream.completeFile with IOException unable to close file because the last block does not have enough number of replicas

     [ https://issues.apache.org/jira/browse/SPARK-31995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jim Huang updated SPARK-31995:
------------------------------
    Description: 
I am using Spark 2.4.5's Spark Structured Streaming running in YARN cluster running on Hadoop 2.7.3.  I have been using Spark Structured Streaming for several months now in this runtime environment until this new corner case that handicapped my Spark structured streaming job in partial working state.

 

I have included the ERROR message and stack trace.  I did a quick search using the string "MicroBatchExecution: Query terminated with error" but did not find any existing Jira that looks like my stack trace.  

 

Based on the naive look at this error message and stack trace, is it possible the Spark's CheckpointFileManager could attempt to handle this HDFS exception better to simply wait a little longer for HDFS's pipeline to complete the replicas?  

 

Being new to this code, where can I find the configuration parameter that sets the replica counts for the `streaming.HDFSMetadataLog`?  I am just trying to understand if there are already some holistic configuration tuning variable(s) the current code provide to be able to handle this IOException more gracefully?  Hopefully experts can provide some pointers or directions.  

 
{code:java}
20/06/12 20:14:15 ERROR MicroBatchExecution: Query [id = yarn-job-id-redacted, runId = run-id-redacted] terminated with error
 java.io.IOException: Unable to close file because the last block does not have enough number of replicas.
 at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2511)
 at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2472)
 at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2437)
 at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:126)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at scala.Option.getOrElse(Option.scala:121)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:547)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
 at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
 at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
 at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193){code}
 

 

  was:
I am using Spark 2.4.5's Spark Structured Streaming running in YARN cluster running on Hadoop 2.7.3.  I have been using Spark Structured Streaming for several months now in this runtime environment until this new corner case that handicapped my Spark structured streaming job in partial working state.

 

I have included the ERROR message and stack trace.  I did a quick search using the string "MicroBatchExecution: Query terminated with error" but did not find any existing Jira that looks like my stack trace.  

 

Based on the naive look at this error message and stack trace, is it possible the Spark's CheckpointFileManager could attempt to handle this HDFS exception better to simply wait a little longer for HDFS's pipeline to complete the replicas?  

 

Being new to this code, where can I find the configuration parameter that sets the replica counts for the `streaming.HDFSMetadataLog`?  I am just trying to understand if there are already some holistic configuration tuning variable(s) the current code provide to be able to handle this IOException more gracefully?  Hopefully experts can provide some pointers or directions.  

 

```

20/06/12 20:14:15 ERROR MicroBatchExecution: Query [id = yarn-job-id-redacted, runId = run-id-redacted] terminated with error
java.io.IOException: Unable to close file because the last block does not have enough number of replicas.
 at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2511)
 at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2472)
 at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2437)
 at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
 at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
 at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:126)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
 at scala.Option.getOrElse(Option.scala:121)
 at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:547)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
 at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
 at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
 at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
 at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

```

 


> Spark Structure Streaming checkpiontFileManager ERROR when HDFS.DFSOutputStream.completeFile with IOException unable to close file because the last block does not have enough number of replicas
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-31995
>                 URL: https://issues.apache.org/jira/browse/SPARK-31995
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.5
>         Environment: Apache Spark 2.4.5 without Hadoop
> Hadoop 2.7.3 - YARN cluster
> delta-core_ 2.11:0.6.1
>  
>            Reporter: Jim Huang
>            Priority: Major
>
> I am using Spark 2.4.5's Spark Structured Streaming running in YARN cluster running on Hadoop 2.7.3.  I have been using Spark Structured Streaming for several months now in this runtime environment until this new corner case that handicapped my Spark structured streaming job in partial working state.
>  
> I have included the ERROR message and stack trace.  I did a quick search using the string "MicroBatchExecution: Query terminated with error" but did not find any existing Jira that looks like my stack trace.  
>  
> Based on the naive look at this error message and stack trace, is it possible the Spark's CheckpointFileManager could attempt to handle this HDFS exception better to simply wait a little longer for HDFS's pipeline to complete the replicas?  
>  
> Being new to this code, where can I find the configuration parameter that sets the replica counts for the `streaming.HDFSMetadataLog`?  I am just trying to understand if there are already some holistic configuration tuning variable(s) the current code provide to be able to handle this IOException more gracefully?  Hopefully experts can provide some pointers or directions.  
>  
> {code:java}
> 20/06/12 20:14:15 ERROR MicroBatchExecution: Query [id = yarn-job-id-redacted, runId = run-id-redacted] terminated with error
>  java.io.IOException: Unable to close file because the last block does not have enough number of replicas.
>  at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2511)
>  at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2472)
>  at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2437)
>  at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>  at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>  at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145)
>  at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:126)
>  at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
>  at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>  at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
>  at scala.Option.getOrElse(Option.scala:121)
>  at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:110)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply$mcV$sp(MicroBatchExecution.scala:547)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:545)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:557)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:545)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>  at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>  at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>  at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>  at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>  at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>  at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193){code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org