You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Liang-Chi Hsieh (JIRA)" <ji...@apache.org> on 2017/08/14 04:23:00 UTC

[jira] [Commented] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

    [ https://issues.apache.org/jira/browse/SPARK-21721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125217#comment-16125217 ] 

Liang-Chi Hsieh commented on SPARK-21721:
-----------------------------------------

Submitted a PR at https://github.com/apache/spark/pull/18934

> Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
> ----------------------------------------------------------------------
>
>                 Key: SPARK-21721
>                 URL: https://issues.apache.org/jira/browse/SPARK-21721
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: yzheng616
>
> The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated.
> Below is a simple code to reproduce it.
> {code:java}
> package test
> import org.apache.spark.sql.SparkSession
> import org.apache.hadoop.fs.Path
> import org.apache.hadoop.fs.FileSystem
> import org.apache.spark.sql.SaveMode
> import java.lang.reflect.Field
> case class PathLeakTest(id: Int, gp: String)
> object StagePathLeak {
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
>     spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
>     //create a partitioned table
>     spark.sql("drop table if exists path_leak");
>     spark.sql("create table if not exists path_leak(id int)" +
>         " partitioned by (gp String)"+
>       " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
>       " stored as"+
>         " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
>         " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
>     var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
>     // 2 partitions
>     for (x <- 1 to 2) {
>       seq += (new PathLeakTest(x, "g" + x))
>     }
>     val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
>     //insert 50 records to Hive table
>     for (j <- 1 to 50) {
>       val df = spark.createDataFrame(rdd)
>       //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem deleteOnExit cache
>       //#2 InsertIntoHiveTable line 385:  delete the path from disk but not from the FileSystem cache, and it caused the leak
>       df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
>     }
>     
>     val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
>     val deleteOnExit = getDeleteOnExit(fs.getClass)
>     deleteOnExit.setAccessible(true)
>     val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
>     //check FileSystem deleteOnExit cache size
>     println(caches.size())
>     val it = caches.iterator()
>     //all starge pathes were still cached even they have already been deleted from the disk
>     while(it.hasNext()){
>       println(it.next());
>     }
>   }
>   
>   def getDeleteOnExit(cls: Class[_]) : Field = {
>     try{
>        return cls.getDeclaredField("deleteOnExit")
>     }catch{
>       case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
>     }
>     return null
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org