You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/15 15:48:04 UTC

spark git commit: [SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed

Repository: spark
Updated Branches:
  refs/heads/branch-2.1 9b749b6ce -> 6f366fbbf


[SPARK-21721][SQL][BACKPORT-2.1] Clear FileSystem deleteOnExit cache when paths are successfully removed

## What changes were proposed in this pull request?

Backport SPARK-21721 to branch 2.1:

We put staging path to delete into the deleteOnExit cache of FileSystem in case of the path can't be successfully removed. But when we successfully remove the path, we don't remove it from the cache. We should do it to avoid continuing grow the cache size.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #18947 from viirya/SPARK-21721-backport-2.1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f366fbb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f366fbb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f366fbb

Branch: refs/heads/branch-2.1
Commit: 6f366fbbf8dc0a00050040891635e1caae8a4faa
Parents: 9b749b6
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Tue Aug 15 08:48:00 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Aug 15 08:48:00 2017 -0700

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    |  8 +++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 21 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3b9c2fc..3567819 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -382,7 +382,13 @@ case class InsertIntoHiveTable(
     // Attempt to delete the staging directory and the inclusive files. If failed, the files are
     // expected to be dropped at the normal termination of VM since deleteOnExit is used.
     try {
-      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
+      createdTempDir.foreach { path =>
+        val fs = path.getFileSystem(hadoopConf)
+        if (fs.delete(path, true)) {
+          // If we successfully delete the staging directory, remove it from FileSystem's cache.
+          fs.cancelDeleteOnExit(path)
+        }
+      }
     } catch {
       case NonFatal(e) =>
         logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)

http://git-wip-us.apache.org/repos/asf/spark/blob/6f366fbb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 1619115..73ceaf8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,12 +20,13 @@ package org.apache.spark.sql.hive.execution
 import java.io.{File, PrintWriter}
 import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
+import java.util.Set
 
 import scala.sys.process.{Process, ProcessLogger}
 import scala.util.Try
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -2031,4 +2032,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       checkAnswer(table.filter($"p" === "p1\" and q=\"q1").select($"a"), Row(4))
     }
   }
+
+  test("SPARK-21721: Clear FileSystem deleterOnExit cache if path is successfully removed") {
+    withTable("test21721") {
+      val deleteOnExitField = classOf[FileSystem].getDeclaredField("deleteOnExit")
+      deleteOnExitField.setAccessible(true)
+
+      val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
+      val setOfPath = deleteOnExitField.get(fs).asInstanceOf[Set[Path]]
+
+      val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
+      sql("CREATE TABLE test21721 (key INT, value STRING)")
+      val pathSizeToDeleteOnExit = setOfPath.size()
+
+      (0 to 10).foreach(_ => testData.write.mode(SaveMode.Append).insertInto("test1"))
+
+      assert(setOfPath.size() == pathSizeToDeleteOnExit)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org