You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Puneet Sharma (Jira)" <ji...@apache.org> on 2022/10/07 09:15:00 UTC

[jira] [Updated] (SPARK-40700) Spark S3 Checkpointing error after enabling RocksDb

     [ https://issues.apache.org/jira/browse/SPARK-40700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Puneet Sharma updated SPARK-40700:
----------------------------------
    Affects Version/s: 3.2.1
                           (was: 3.2.0)

> Spark S3 Checkpointing error after enabling RocksDb
> ---------------------------------------------------
>
>                 Key: SPARK-40700
>                 URL: https://issues.apache.org/jira/browse/SPARK-40700
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.2.1
>         Environment: AWS ROSA OpenShift, AWS S3, RocksDB
>            Reporter: Puneet Sharma
>            Priority: Major
>
> We are running Spark Streaming state based application on OpenShift cluster. We are using Amazon S3 for checkpointing. Rocks DB has also been enabled using configuration - "spark.sql.streaming.stateStore.providerClass" => "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider".
> However, we are seeing intermittent errors in state checkpointing on S3. As a result, executors PODS go in error state. 
>  
>  
> {{2022-09-30T07:41:12+0000 [ERROR] [RocksDBFileManager StateStoreId(opId=0,partId=15,name=default)]: Error zipping to s3a://event-analytics-engine-4-bucket/checkpoint_violationAggregation/state/0/15/4.zip
>     /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009
>     /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/OPTIONS-000012
>     /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/CURRENT
>     /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/000010.log
>     /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/metadata
> java.io.FileNotFoundException: /var/data/spark-32e2d20b-fdbe-4b73-8509-4bedfcc8e9f4/spark-5b602907-bc1c-4f2c-bac1-045935d6d772/StateStoreId(opId=0,partId=15,name=default)-9ad61263-c366-48d1-a79e-e4f17a3109b2/checkpoint-1b31b396-8e79-4496-8f8d-8114217ea015/MANIFEST-000009 (No such file or directory)
>     at java.io.FileInputStream.open(FileInputStream.java:212) ~[?:1.8.0]
>     at java.io.FileInputStream.<init>(FileInputStream.java:152) ~[?:1.8.0]
>     at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1(RocksDBFileManager.scala:442) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.$anonfun$zipToDfsFile$1$adapted(RocksDBFileManager.scala:440) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:440) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:172) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$commit$12(RocksDB.scala:265) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDB.timeTakenMs(RocksDB.scala:479) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDB.commit(RocksDB.scala:265) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.commit(RocksDBStateStoreProvider.scala:93) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$5(FlatMapGroupsWithStateExec.scala:190) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:142) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:142) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.timeTakenMs(FlatMapGroupsWithStateExec.scala:53) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$processDataWithPartition$4(FlatMapGroupsWithStateExec.scala:190) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) ~[scala-library-2.12.15.jar:?]
>     at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1498) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1408) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1472) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1295) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55) ~[spark-sql_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.scheduler.Task.run(Task.scala:131) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) [spark-core_2.12-3.2.1.jar:3.2.1]
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) [spark-core_2.12-3.2.1.jar:3.2.1]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1160) [?:1.8.0]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:1.8.0]
>     at java.lang.Thread.run(Thread.java:830) [?:2.9 (05-11-2022)]
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Putting block rdd_1342_15 failed due to exception java.lang.NullPointerException.
> 2022-09-30T07:41:12+0000 [WARN] [BlockManager]: Block rdd_1342_15 could not be removed as it was not found on disk or in memory}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org