You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Avner Livne (Jira)" <ji...@apache.org> on 2020/09/08 15:43:01 UTC

[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming FileStreamSinkLog metadata compact file data.

    [ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192280#comment-17192280 ] 

Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM:
--------------------------------------------------------------

for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = underlying.hasNext
      override def next: T = underlying.next
    }
    wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
    val path = new Path(dstPath)
    if (fs.exists(path)) {
        println(s"deleting ${dstPath}...")
        fs.delete(path, true)
    }
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
    val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days)
    val compacted_file = s"/tmp/${file.getName.toString}"
    removePath(compacted_file, fs)
    val lines = sc.textFile(file.toString)
    val reduced_lines = lines.mapPartitions({
        p => 
            implicit val formats = DefaultFormats
            p.collect({
                case "v1" => "v1"
                case x if { 
                    parse(x).extract[SinkFileStatus].modificationTime > ttl 
                } => x
            })
    }).coalesce(1)
    println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...")
    reduced_lines.saveAsTextFile(compacted_file)
    FileUtil.copy(fs, new Path(compacted_file + "/part-00000"), fs, file, false, sc.hadoopConfiguration)
    removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
    fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
        case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => 
            x.getPath
    })
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
      .readStream
      .format("kafka") ///..... whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge everything else
I run this code on driver startup - but it can certainly run async in some sidecar cronjob 

tested on spark 3.0.0 writing parquet files 



was (Author: stakej@gmail.com):
for those looking for a temporary workaround:

run this code before you start the streaming:

{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._

/**
  * regex to find last compact file
  */
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)

/**
  * implicit hadoop RemoteIterator convertor
  */
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = {
    case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
      override def hasNext: Boolean = underlying.hasNext
      override def next: T = underlying.next
    }
    wrapper(underlying)
  }

/**
  * delete file or folder recursively 
  */
def removePath(dstPath: String, fs: FileSystem): Unit = {
    val path = new Path(dstPath)
    if (fs.exists(path)) {
        println(s"deleting ${dstPath}...")
        fs.delete(path, true)
    }
}

/**
  * remove json entries older than `days` from compact file
  * preserve `v1` at the head of the file
  * re write the small file back to the original destination
  */
def compact(file: Path, days: Int = 20) = {
    val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days)
    val compacted_file = s"/tmp/${file.getName.toString}"
    removePath(compacted_file, fs)
    val lines = sc.textFile(file.toString)
    val reduced_lines = lines.mapPartitions({
        p => 
            implicit val formats = DefaultFormats
            p.collect({
                case "v1" => "v1"
                case x if { 
                    parse(x).extract[SinkFileStatus].modificationTime > ttl 
                } => x
            })
    }).coalesce(1)
    println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...")
    reduced_lines.saveAsTextFile(compacted_file)
    FileUtil.copy(fs, new Path(compacted_file + "/part-00000"), fs, file, false, sc.hadoopConfiguration)
    removePath(compacted_file, fs)
}

/**
  * get last compacted files if exists
  */
def getLastCompactFile(path: Path) = {
    fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
        case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) => 
            x.getPath
    })
}

val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))

val df = spark
      .readStream
      .format("kafka") ///..... whatever stream you like
{code}


this example will retain SinkFileStatus from the last 20 days and will purge everything else
I run this code on driver startup - but it can certainly run async in some sidecar cronjob 

tested on spark 3.0.0 writing parquet files 


> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> ------------------------------------------------------------------------
>
>                 Key: SPARK-24295
>                 URL: https://issues.apache.org/jira/browse/SPARK-24295
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Iqbal Singh
>            Priority: Major
>         Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing slowness  while reading the data from FileStreamSinkLog dir as spark is defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org