You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yzheng616 (JIRA)" <ji...@apache.org> on 2017/08/14 02:54:00 UTC
[jira] [Created] (SPARK-21721) Memory leak in
org.apache.spark.sql.hive.execution.InsertIntoHiveTable
yzheng616 created SPARK-21721:
---------------------------------
Summary: Memory leak in org.apache.spark.sql.hive.execution.InsertIntoHiveTable
Key: SPARK-21721
URL: https://issues.apache.org/jira/browse/SPARK-21721
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.1.1
Reporter: yzheng616
The leak came from org.apache.spark.sql.hive.execution.InsertIntoHiveTable. At line 118, it put a staging path to FileSystem delete cache, and then remove the path from disk at line 385. It does not remove the path from FileSystem cache. If a streaming application keep persisting data to a partitioned hive table, the memory will keep increasing until JVM terminated.
Below is a simple code to reproduce it.
package test
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.SaveMode
import java.lang.reflect.Field
case class PathLeakTest(id: Int, gp: String)
object StagePathLeak {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[4]").appName("StagePathLeak").enableHiveSupport().getOrCreate()
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
//create a partitioned table
spark.sql("drop table if exists path_leak");
spark.sql("create table if not exists path_leak(id int)" +
" partitioned by (gp String)"+
" row format serde 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'"+
" stored as"+
" inputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'"+
" outputformat 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'")
var seq = new scala.collection.mutable.ArrayBuffer[PathLeakTest]()
for (x <- 1 to 2) {
seq += (new PathLeakTest(x, "g" + x))
}
val rdd = spark.sparkContext.makeRDD[PathLeakTest](seq)
//insert 50 records to Hive table
for (j <- 1 to 2) {
val df = spark.createDataFrame(rdd)
//#1 InsertIntoHiveTable line 118: add stage path to FileSystem deleteOnExit cache
//#2 InsertIntoHiveTable line 385: delete the path from disk but not from the FileSystem cache, and it caused the leak
df.write.mode(SaveMode.Overwrite).insertInto("path_leak")
}
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val deleteOnExit = getDeleteOnExit(fs.getClass)
deleteOnExit.setAccessible(true)
val caches = deleteOnExit.get(fs).asInstanceOf[java.util.TreeSet[Path]]
//check FileSystem deleteOnExit cache size
println(caches.size())
val it = caches.iterator()
//all starge pathes were still cached even they have already been deleted from the disk
while(it.hasNext()){
println(it.next());
}
}
def getDeleteOnExit(cls: Class[_]) : Field = {
try{
return cls.getDeclaredField("deleteOnExit")
}catch{
case ex: NoSuchFieldException => return getDeleteOnExit(cls.getSuperclass)
}
return null
}
}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org