You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yzheng616 (JIRA)" <ji...@apache.org> on 2017/08/14 02:54:00 UTC

[jira] [Created] (SPARK-21721) Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable

yzheng616 created SPARK-21721:
---------------------------------

             Summary: Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
                 Key: SPARK-21721
                 URL: https://issues.apache.org/jira/browse/SPARK-21721
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.1
            Reporter: yzheng616


The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated.

Below is a simple code to reproduce it.

package test

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field



case class PathLeakTest(id: Int, gp: String)

object StagePathLeak {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    //create a partitioned table
    spark.sql("drop table if exists path_leak");
    spark.sql("create table if not exists path_leak(id int)" +
        " partitioned by (gp String)"+
      " row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
      " stored as"+
        " inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
        " outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")

    var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
    for (x <- 1 to 2) {
      seq += (new PathLeakTest(x, "g" + x))
    }
    val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)

    //insert 50 records to Hive table
    for (j <- 1 to 2) {
      val df = spark.createDataFrame(rdd)
      //#1 InsertIntoHiveTable line 118:  add stage path to FileSystem deleteOnExit cache
      //#2 InsertIntoHiveTable line 385:  delete the path from disk but not from the FileSystem cache, and it caused the leak
      df.write.mode(SaveMode.Overwrite).insertInto("path_leak")  
    }
    
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val deleteOnExit = getDeleteOnExit(fs.getClass)
    deleteOnExit.setAccessible(true)
    val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
    //check FileSystem deleteOnExit cache size
    println(caches.size())
    val it = caches.iterator()
    //all starge pathes were still cached even they have already been deleted from the disk
    while(it.hasNext()){
      println(it.next());
    }
  }
  
  def getDeleteOnExit(cls: Class[_]) : Field = {
    try{
       return cls.getDeclaredField("deleteOnExit")
    }catch{
      case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
    }
    return null
  }

}



 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org