You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Gerard Maas (JIRA)" <ji...@apache.org> on 2019/06/12 10:51:00 UTC

[jira] [Commented] (SPARK-28025) HDFSBackedStateStoreProvider should not leak .crc files

    [ https://issues.apache.org/jira/browse/SPARK-28025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861991#comment-16861991 ] 

Gerard Maas commented on SPARK-28025:
-------------------------------------

I reproduced  the issue in a  spark-shell session:
{code:java}
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.3
/_/

scala> import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming._

scala> val hadoopConf = spark.sparkContext.hadoopConfiguration

scala> import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf

scala> SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key
res1: String = spark.sql.streaming.checkpointFileManagerClass

scala> hadoopConf.getSQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key)
res2: String = null

// mount point for the shared PVC: /storage
scala> val glusterCpfm = new org.apache.hadoop.fs.Path("/storage/crc-store")
glusterCpfm: org.apache.hadoop.fs.Path = /storage/crc-store

scala> val glusterfm = CheckpointFileManager.create(glusterCpfm, hadoopConf)
glusterfm: org.apache.spark.sql.execution.streaming.CheckpointFileManager = org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager@28d00f54

scala> glusterfm.isLocal
res17: Boolean = true

scala> glusterfm.mkdirs(glusterCpfm)

scala> val atomicFile = glusterfm.createAtomic(new org.apache.hadoop.fs.Path("/storage/crc-store/file.log"), false)
atomicFile: org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream = org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@1c6e065

scala> atomicFile.writeChars("Hello, World")

scala> atomicFile.close

/**
* Inspect the file system
*
* $ cat file.log
* Hello, World
* $ ls -al
* total 5
* drwxr-sr-x. 2 jboss 2000 85 Jun 12 09:44 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 ..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
* -rw-r--r--. 1 jboss 2000 24 Jun 12 09:44 file.log
**/

// Delete the file -- simulate the operation done by the HDFSBackedStateStoreProvider#cleanup

scala> glusterfm.delete(new org.apache.hadoop.fs.Path("/storage/crc-store/file.log"))

/**
* Inspect the file system -> .crc file left behind
* $ ls -al
* total 9
* drwxr-sr-x. 2 jboss 2000 4096 Jun 12 09:46 .
* drwxrwsr-x. 8 root 2000 4096 Jun 12 09:42 ..
* -rw-r--r--. 1 jboss 2000 12 Jun 12 09:44 ..file.log.c6f90863-77d2-494e-b1cc-0d0ed1344f74.tmp.crc
**/
{code}

> HDFSBackedStateStoreProvider should not leak .crc files 
> --------------------------------------------------------
>
>                 Key: SPARK-28025
>                 URL: https://issues.apache.org/jira/browse/SPARK-28025
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.3
>         Environment: Spark 2.4.3
> Kubernetes 1.11(?) (OpenShift)
> StateStore storage on a mounted PVC. Viewed as a local filesystem by the `FileContextBasedCheckpointFileManager` : 
> {noformat}
> scala> glusterfm.isLocal
> res17: Boolean = true{noformat}
>            Reporter: Gerard Maas
>            Priority: Major
>
> The HDFSBackedStateStoreProvider when using the default CheckpointFileManager is leaving '.crc' files behind. There's a .crc file created for each `atomicFile` operation of the CheckpointFileManager.
> Over time, the number of files becomes very large. It makes the state store file system constantly increase in size and, in our case, deteriorates the file system performance.
> Here's a sample of one of our spark storage volumes after 2 days of execution (4 stateful streaming jobs, each on a different sub-dir):
>  # 
> {noformat}
> Total files in PVC (used for checkpoints and state store)
> $find . | wc -l
> 431796
> # .crc files
> $find . -name "*.crc" | wc -l
> 418053{noformat}
> With each .crc file taking one storage block, the used storage runs into the GBs of data.
> These jobs are running on Kubernetes. Our shared storage provider, GlusterFS, shows serious performance deterioration with this large number of files:
> {noformat}
> DEBUG HDFSBackedStateStoreProvider: fetchFiles() took 29164ms{noformat}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org