You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/07 01:23:12 UTC

spark git commit: [SPARK-18372][SQL][BRANCH-1.6] Staging directory fail to be removed

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 70f271b56 -> 2303887ce


[SPARK-18372][SQL][BRANCH-1.6] Staging directory fail to be removed

## What changes were proposed in this pull request?

This fix is related to be bug: https://issues.apache.org/jira/browse/SPARK-18372 .
The insertIntoHiveTable would generate a .staging directory, but this directory  fail to be removed in the end.

This is backport from spark 2.0.x code, and is related to PR #12770

## How was this patch tested?
manual tests

Author: Mingjie Tang <mtanghortonworks.com>

Author: Mingjie Tang <mt...@hortonworks.com>
Author: Mingjie Tang <mt...@HW12398.local>

Closes #15819 from merlintang/branch-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2303887c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2303887c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2303887c

Branch: refs/heads/branch-1.6
Commit: 2303887ce659b2ee90bfb412dc17629668894b03
Parents: 70f271b
Author: Mingjie Tang <mt...@hortonworks.com>
Authored: Fri Jan 6 17:22:59 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Jan 6 17:22:59 2017 -0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 86 ++++++++++++++++++--
 .../spark/sql/hive/client/VersionsSuite.scala   | 40 ++++++++-
 2 files changed, 118 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2303887c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf5..bdf7117 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,24 +17,33 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
 import java.util
+import java.util.{Date, Random}
 
-import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+
+import scala.util.control.NonFatal
 
+import scala.collection.JavaConverters._
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.exec.TaskRunner
 import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
 import org.apache.hadoop.hive.serde2.Serializer
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
 import org.apache.hadoop.hive.serde2.objectinspector._
 import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.types.DataType
@@ -54,6 +63,63 @@ case class InsertIntoHiveTable(
   @transient private lazy val hiveContext = new Context(sc.hiveconf)
   @transient private lazy val catalog = sc.catalog
 
+  @transient var createdTempDir: Option[Path] = None
+  val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+
+  private def executionId: String = {
+    val rand: Random = new Random
+    val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
+    val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+    executionId
+  }
+
+  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+    val inputPathUri: URI = inputPath.toUri
+    val inputPathName: String = inputPathUri.getPath
+    val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+    val stagingPathName: String =
+      if (inputPathName.indexOf(stagingDir) == -1) {
+        new Path(inputPathName, stagingDir).toString
+      } else {
+        inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+      }
+    val dir: Path =
+      fs.makeQualified(
+        new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+    logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+    try {
+      if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+        throw new IllegalStateException("Cannot create staging directory  '" + dir.toString + "'")
+      }
+      createdTempDir = Some(dir)
+      fs.deleteOnExit(dir)
+    }
+    catch {
+      case e: IOException =>
+        throw new RuntimeException(
+          "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+
+    }
+    return dir
+  }
+
+  private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
+    getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+  }
+
+  def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+    val extURI: URI = path.toUri
+    if (extURI.getScheme == "viewfs") {
+      getExtTmpPathRelTo(path.getParent, hadoopConf)
+    } else {
+      new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+    }
+  }
+
+  def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
+    new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+  }
+
   private def newSerializer(tableDesc: TableDesc): Serializer = {
     val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
     serializer.initialize(null, tableDesc.getProperties)
@@ -129,7 +195,9 @@ case class InsertIntoHiveTable(
     // instances within the closure, since Serializer is not serializable while TableDesc is.
     val tableDesc = table.tableDesc
     val tableLocation = table.hiveQlTable.getDataLocation
-    val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
+    val jobConf = new JobConf(sc.hiveconf)
+    val tmpLocation = getExternalTmpPath(tableLocation, jobConf)
+
     val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
     val isCompressed = sc.hiveconf.getBoolean(
       ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
@@ -175,7 +243,6 @@ case class InsertIntoHiveTable(
       }
     }
 
-    val jobConf = new JobConf(sc.hiveconf)
     val jobConfSer = new SerializableJobConf(jobConf)
 
     // When speculation is on and output committer class name contains "Direct", we should warn
@@ -260,6 +327,15 @@ case class InsertIntoHiveTable(
         holdDDLTime)
     }
 
+    // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+    // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+    try {
+      createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) }
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+    }
+
     // Invalidate the cache.
     sqlContext.cacheManager.invalidateCache(table)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2303887c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 502b240..09a1067 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client
 import java.io.File
 
 import org.apache.hadoop.util.VersionInfo
-
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.types.IntegerType
 import org.apache.spark.tags.ExtendedHiveTest
 import org.apache.spark.util.Utils
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
 
 /**
  * A simple set of tests that call the methods of a hive ClientInterface, loading different version
@@ -36,7 +39,7 @@ import org.apache.spark.util.Utils
  * is not fully tested.
  */
 @ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite  with SQLTestUtils with TestHiveSingleton with Logging {
 
   // In order to speed up test execution during development or in Jenkins, you can specify the path
   // of an existing Ivy cache:
@@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging {
         "as 'COMPACT' WITH DEFERRED REBUILD")
       client.reset()
     }
+
+    test(s"$version: CREATE TABLE AS SELECT") {
+      withTable("tbl") {
+        sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+        assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1)))
+      }
+    }
+
+    test(s"$version: Delete the temporary staging directory and files after each insert") {
+      withTempDir { tmpDir =>
+        withTable("tab", "tbl") {
+          sqlContext.sql(
+            s"""
+               |CREATE TABLE tab(c1 string)
+               |location '${tmpDir.toURI.toString}'
+             """.stripMargin)
+
+          sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+          sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ")
+
+          def listFiles(path: File): List[String] = {
+            val dir = path.listFiles()
+            val folders = dir.filter(_.isDirectory).toList
+            val filePaths = dir.map(_.getName).toList
+            folders.flatMap(listFiles) ++: filePaths
+          }
+          val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+          assert(listFiles(tmpDir).sorted == expectedFiles)
+        }
+      }
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org