You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Avner Livne (Jira)" <ji...@apache.org> on 2020/09/08 15:43:01 UTC
[jira] [Comment Edited] (SPARK-24295) Purge Structured streaming
FileStreamSinkLog metadata compact file data.
[ https://issues.apache.org/jira/browse/SPARK-24295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17192280#comment-17192280 ]
Avner Livne edited comment on SPARK-24295 at 9/8/20, 3:42 PM:
--------------------------------------------------------------
for those looking for a temporary workaround:
run this code before you start the streaming:
{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._
/**
* regex to find last compact file
*/
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)
/**
* implicit hadoop RemoteIterator convertor
*/
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = underlying.hasNext
override def next: T = underlying.next
}
wrapper(underlying)
}
/**
* delete file or folder recursively
*/
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}
/**
* remove json entries older than `days` from compact file
* preserve `v1` at the head of the file
* re write the small file back to the original destination
*/
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p =>
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if {
parse(x).extract[SinkFileStatus].modificationTime > ttl
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-00000"), fs, file, false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}
/**
* get last compacted files if exists
*/
def getLastCompactFile(path: Path) = {
fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) =>
x.getPath
})
}
val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))
val df = spark
.readStream
.format("kafka") ///..... whatever stream you like
{code}
this example will retain SinkFileStatus from the last 20 days and will purge everything else
I run this code on driver startup - but it can certainly run async in some sidecar cronjob
tested on spark 3.0.0 writing parquet files
was (Author: stakej@gmail.com):
for those looking for a temporary workaround:
run this code before you start the streaming:
{code:java}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.sql.execution.streaming._
/**
* regex to find last compact file
*/
val file_pattern = """(hdfs://.*/_spark_metadata/\d+\.compact)""".r.unanchored
val fs = FileSystem.get(sc.hadoopConfiguration)
/**
* implicit hadoop RemoteIterator convertor
*/
implicit def convertToScalaIterator[T](underlying: RemoteIterator[T]): Iterator[T] = {
case class wrapper(underlying: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = underlying.hasNext
override def next: T = underlying.next
}
wrapper(underlying)
}
/**
* delete file or folder recursively
*/
def removePath(dstPath: String, fs: FileSystem): Unit = {
val path = new Path(dstPath)
if (fs.exists(path)) {
println(s"deleting ${dstPath}...")
fs.delete(path, true)
}
}
/**
* remove json entries older than `days` from compact file
* preserve `v1` at the head of the file
* re write the small file back to the original destination
*/
def compact(file: Path, days: Int = 20) = {
val ttl = new java.util.Date().getTime - java.util.concurrent.TimeUnit.DAYS.toMillis(days)
val compacted_file = s"/tmp/${file.getName.toString}"
removePath(compacted_file, fs)
val lines = sc.textFile(file.toString)
val reduced_lines = lines.mapPartitions({
p =>
implicit val formats = DefaultFormats
p.collect({
case "v1" => "v1"
case x if {
parse(x).extract[SinkFileStatus].modificationTime > ttl
} => x
})
}).coalesce(1)
println(s"removing ${lines.count - reduced_lines.count} lines from ${file.toString}...")
reduced_lines.saveAsTextFile(compacted_file)
FileUtil.copy(fs, new Path(compacted_file + "/part-00000"), fs, file, false, sc.hadoopConfiguration)
removePath(compacted_file, fs)
}
/**
* get last compacted files if exists
*/
def getLastCompactFile(path: Path) = {
fs.listFiles(path, true).toList.sortBy(_.getModificationTime).reverse.collectFirst({
case x if (file_pattern.findFirstMatchIn(x.getPath.toString).isDefined) =>
x.getPath
})
}
val my_folder = "/my/root/spark/structerd/streaming/output/folder"
val metadata_folder = new Path(s"$my_folder/_spark_metadata"))
getLastCompactFile(metadata_folder).map(x => compact(x, 20))
val df = spark
.readStream
.format("kafka") ///..... whatever stream you like
{code}
this example will retain SinkFileStatus from the last 20 days and will purge everything else
I run this code on driver startup - but it can certainly run async in some sidecar cronjob
tested on spark 3.0.0 writing parquet files
> Purge Structured streaming FileStreamSinkLog metadata compact file data.
> ------------------------------------------------------------------------
>
> Key: SPARK-24295
> URL: https://issues.apache.org/jira/browse/SPARK-24295
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.0
> Reporter: Iqbal Singh
> Priority: Major
> Attachments: spark_metadatalog_compaction_perfbug_repro.tar.gz
>
>
> FileStreamSinkLog metadata logs are concatenated to a single compact file after defined compact interval.
> For long running jobs, compact file size can grow up to 10's of GB's, Causing slowness while reading the data from FileStreamSinkLog dir as spark is defaulting to the "__spark__metadata" dir for the read.
> We need a functionality to purge the compact file size.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org