You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "phan minh duc (JIRA)" <ji...@apache.org> on 2019/06/11 02:20:00 UTC

[jira] [Issue Comment Deleted] (SPARK-20894) Error while checkpointing to HDFS

     [ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

phan minh duc updated SPARK-20894:
----------------------------------
    Comment: was deleted

(was: I'm using spark 2.4.0 and facing the same issue when i submit structured streaming app on cluster with 2 executor, but that error not appear if i only deploy on 1 executor.

EDIT: even running with only 1 executor i'm still facing the same issue, all the checkpoint Location i'm using was in hdfs, and the HDFSStateProvider report an error about reading the .delta state file in /tmp.

A part of my log

2019-06-10 02:47:21 WARN  TaskSetManager:66 - Lost task 44.1 in stage 92852.0 (TID 305080, 10.244.2.205, executor 2): java.lang.IllegalStateException: Error reading delta file file:/tmp/temporary-06b7ccbd-b9d4-438b-8ed9-8238031ef075/state/2/44/1.delta of HDFSStateStoreProvider[id = (op=2,part=44),dir = file:/tmp/temporary-06b7ccbd-b9d4-438b-8ed9-8238031ef075/state/2/44]: file:/tmp/temporary-06b7ccbd-b9d4-438b-8ed9-8238031ef075/state/2/44/1.delta does not exist
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:427)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply$mcVJ$sp(HDFSBackedStateStoreProvider.scala:384)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6$$anonfun$apply$1.apply(HDFSBackedStateStoreProvider.scala:383)
    at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:73)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:383)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$$anonfun$6.apply(HDFSBackedStateStoreProvider.scala:356)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:535)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.loadMap(HDFSBackedStateStoreProvider.scala:356)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.getStore(HDFSBackedStateStoreProvider.scala:204)
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:371)
    at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:88)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: file:/tmp/temporary-06b7ccbd-b9d4-438b-8ed9-8238031ef075/state/2/44/1.delta
    at org.apache.hadoop.fs.RawLocalFileSystem.open(RawLocalFileSystem.java:200)
    at org.apache.hadoop.fs.DelegateToFileSystem.open(DelegateToFileSystem.java:183)
    at org.apache.hadoop.fs.AbstractFileSystem.open(AbstractFileSystem.java:628)
    at org.apache.hadoop.fs.FilterFs.open(FilterFs.java:205)
    at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:795)
    at org.apache.hadoop.fs.FileContext$6.next(FileContext.java:791)
    at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    at org.apache.hadoop.fs.FileContext.open(FileContext.java:797)
    at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.open(CheckpointFileManager.scala:322)
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$updateFromDeltaFile(HDFSBackedStateStoreProvider.scala:424)
    ... 28 more)

> Error while checkpointing to HDFS
> ---------------------------------
>
>                 Key: SPARK-20894
>                 URL: https://issues.apache.org/jira/browse/SPARK-20894
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.1.1
>         Environment: Ubuntu, Spark 2.1.1, hadoop 2.7
>            Reporter: kant kodali
>            Assignee: Shixiong Zhu
>            Priority: Major
>             Fix For: 2.3.0
>
>         Attachments: driver_info_log, executor1_log, executor2_log
>
>
> Dataset<Row> df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 hours", "24 hours"), df1.col("AppName")).count();
> StreamingQuery query = df2.writeStream().foreach(new KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start();
> query.awaitTermination();
> This for some reason fails with the Error 
> ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalStateException: Error reading delta file /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist
> I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/  and all consumer offsets in Kafka from all brokers prior to running and yet this error still persists. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org