You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/15 05:29:21 UTC

spark git commit: [SPARK-21721][SQL] Clear FileSystem deleteOnExit cache when paths are successfully removed

Repository: spark
Updated Branches:
  refs/heads/master 282f00b41 -> 4c3cf1cc5


[SPARK-21721][SQL] Clear FileSystem deleteOnExit cache when paths are successfully removed

## What changes were proposed in this pull request?

We put staging path to delete into the deleteOnExit cache of `FileSystem` in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size.

## How was this patch tested?

Added a test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #18934 from viirya/SPARK-21721.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c3cf1cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c3cf1cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c3cf1cc

Branch: refs/heads/master
Commit: 4c3cf1cc5cdb400ceef447d366e9f395cd87b273
Parents: 282f00b
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Mon Aug 14 22:29:15 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Aug 14 22:29:15 2017 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  8 ++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 22 ++++++++++++++++++--
 2 files changed, 27 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c3cf1cc/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b6f4898..858f29c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -423,7 +423,13 @@ case class InsertIntoHiveTable(
     // Attempt to delete the staging directory and the inclusive files. If failed, the files are
     // expected to be dropped at the normal termination of VM since deleteOnExit is used.
     try {
-      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
     } catch {
       case NonFatal(e) =>
         logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

http://git-wip-us.apache.org/repos/asf/spark/blob/4c3cf1cc/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a949e5e..45bbb0c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
-import java.util.Locale
+import java.util.{Locale, Set}
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.TestUtils
 import org.apache.spark.sql._
@@ -2021,4 +2021,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
     }
   }
+
+  test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") {
+    withTable("test21721") {
+      val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
+      deleteOnExitField.setAccessible(true)
+
+      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+      val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
+
+      val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
+      sql("CREATE TABLE test21721 (key INT, value STRING)")
+      val pathSizeToDeleteOnExit = setOfPath.size()
+
+      (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
+
+      assert(setOfPath.size() == pathSizeToDeleteOnExit)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org