You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/15 01:24:00 UTC

spark git commit: [SPARK-18703][SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables

Repository: spark
Updated Branches:
  refs/heads/master 324388531 -> 8db4d95c0


[SPARK-18703][SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables

### What changes were proposed in this pull request?
Below are the files/directories generated for three inserts againsts a Hive table:
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```

The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped.

Only the last two files are needed, as shown below.
```
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc
/private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000
```
The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped.

This PR is to drop the created staging files and temporary data files after each insert/CTAS.

### How was this patch tested?
Added a test case

Author: gatorsmile <ga...@gmail.com>

Closes #16134 from gatorsmile/deleteFiles.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8db4d95c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8db4d95c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8db4d95c

Branch: refs/heads/master
Commit: 8db4d95c02080cd120740d106292f0281e8f9249
Parents: 3243885
Author: gatorsmile <ga...@gmail.com>
Authored: Thu Dec 15 09:23:55 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Dec 15 09:23:55 2016 +0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 16 +++++++++++--
 .../spark/sql/hive/client/VersionsSuite.scala   | 24 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8db4d95c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 82c7b1a..aa858e8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,6 +22,8 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale, Random}
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
   def output: Seq[Attribute] = Seq.empty
 
   val hadoopConf = sessionState.newHadoopConf()
+  var createdTempDir: Option[Path] = None
   val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
   val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
@@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
       }
+      createdTempDir = Some(dir)
       fs.deleteOnExit(dir)
     } catch {
       case e: IOException =>
         throw new RuntimeException(
           "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
-
     }
     return dir
   }
@@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
       if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
         throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
       }
+      createdTempDir = Some(dirPath)
       fs.deleteOnExit(dirPath)
     } catch {
       case e: IOException =>
         throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
-
     }
     dirPath
   }
@@ -378,6 +381,15 @@ case class InsertIntoHiveTable(
         isSrcLocal = false)
     }
 
+    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+
     // Invalidate the cache.
     sqlContext.sharedState.cacheManager.invalidateCache(table)
     sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)

http://git-wip-us.apache.org/repos/asf/spark/blob/8db4d95c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9b26383..8dd0699 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
       }
     }
 
+    test(s"$version: Delete the temporary staging directory and files after each insert") {
+      withTempDir { tmpDir =>
+        withTable("tab") {
+          spark.sql(
+            s"""
+               |CREATE TABLE tab(c1 string)
+               |location '${tmpDir.toURI.toString}'
+             """.stripMargin)
+
+          (1 to 3).map { i =>
+            spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+          }
+          def listFiles(path: File): List[String] = {
+            val dir = path.listFiles()
+            val folders = dir.filter(_.isDirectory).toList
+            val filePaths = dir.map(_.getName).toList
+            folders.flatMap(listFiles) ++: filePaths
+          }
+          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+          assert(listFiles(tmpDir).sorted == expectedFiles)
+        }
+      }
+    }
+
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org