You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/07 01:23:12 UTC
spark git commit: [SPARK-18372][SQL][BRANCH-1.6] Staging directory
fail to be removed
Repository: spark
Updated Branches:
refs/heads/branch-1.6 70f271b56 -> 2303887ce
[SPARK-18372][SQL][BRANCH-1.6] Staging directory fail to be removed
## What changes were proposed in this pull request?
This fix is related to be bug: https://issues.apache.org/jira/browse/SPARK-18372 .
The insertIntoHiveTable would generate a .staging directory, but this directory fail to be removed in the end.
This is backport from spark 2.0.x code, and is related to PR #12770
## How was this patch tested?
manual tests
Author: Mingjie Tang <mtanghortonworks.com>
Author: Mingjie Tang <mt...@hortonworks.com>
Author: Mingjie Tang <mt...@HW12398.local>
Closes #15819 from merlintang/branch-1.6.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2303887c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2303887c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2303887c
Branch: refs/heads/branch-1.6
Commit: 2303887ce659b2ee90bfb412dc17629668894b03
Parents: 70f271b
Author: Mingjie Tang <mt...@hortonworks.com>
Authored: Fri Jan 6 17:22:59 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Jan 6 17:22:59 2017 -0800
----------------------------------------------------------------------
.../hive/execution/InsertIntoHiveTable.scala | 86 ++++++++++++++++++--
.../spark/sql/hive/client/VersionsSuite.scala | 40 ++++++++-
2 files changed, 118 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2303887c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f936cf5..bdf7117 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,24 +17,33 @@
package org.apache.spark.sql.hive.execution
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
import java.util
+import java.util.{Date, Random}
-import scala.collection.JavaConverters._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+
+import scala.util.control.NonFatal
+import scala.collection.JavaConverters._
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{UnaryNode, SparkPlan}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.DataType
@@ -54,6 +63,63 @@ case class InsertIntoHiveTable(
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog
+ @transient var createdTempDir: Option[Path] = None
+ val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+
+ private def executionId: String = {
+ val rand: Random = new Random
+ val format: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
+ val executionId: String = "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
+ executionId
+ }
+
+ private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+ val inputPathUri: URI = inputPath.toUri
+ val inputPathName: String = inputPathUri.getPath
+ val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+ val stagingPathName: String =
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ new Path(inputPathName, stagingDir).toString
+ } else {
+ inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
+ }
+ val dir: Path =
+ fs.makeQualified(
+ new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
+ logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
+ }
+ createdTempDir = Some(dir)
+ fs.deleteOnExit(dir)
+ }
+ catch {
+ case e: IOException =>
+ throw new RuntimeException(
+ "Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
+
+ }
+ return dir
+ }
+
+ private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
+ getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+ }
+
+ def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+ val extURI: URI = path.toUri
+ if (extURI.getScheme == "viewfs") {
+ getExtTmpPathRelTo(path.getParent, hadoopConf)
+ } else {
+ new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+ }
+ }
+
+ def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
+ new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+ }
+
private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
serializer.initialize(null, tableDesc.getProperties)
@@ -129,7 +195,9 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
+ val jobConf = new JobConf(sc.hiveconf)
+ val tmpLocation = getExternalTmpPath(tableLocation, jobConf)
+
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = sc.hiveconf.getBoolean(
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
@@ -175,7 +243,6 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(sc.hiveconf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn
@@ -260,6 +327,15 @@ case class InsertIntoHiveTable(
holdDDLTime)
}
+ // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+ // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+ try {
+ createdTempDir.foreach { path => path.getFileSystem(jobConf).delete(path, true) }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+ }
+
// Invalidate the cache.
sqlContext.cacheManager.invalidateCache(table)
http://git-wip-us.apache.org/repos/asf/spark/blob/2303887c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 502b240..09a1067 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,14 +20,17 @@ package org.apache.spark.sql.hive.client
import java.io.File
import org.apache.hadoop.util.VersionInfo
-
+import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.tags.ExtendedHiveTest
import org.apache.spark.util.Utils
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+
/**
* A simple set of tests that call the methods of a hive ClientInterface, loading different version
@@ -36,7 +39,7 @@ import org.apache.spark.util.Utils
* is not fully tested.
*/
@ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
// In order to speed up test execution during development or in Jenkins, you can specify the path
// of an existing Ivy cache:
@@ -216,5 +219,36 @@ class VersionsSuite extends SparkFunSuite with Logging {
"as 'COMPACT' WITH DEFERRED REBUILD")
client.reset()
}
+
+ test(s"$version: CREATE TABLE AS SELECT") {
+ withTable("tbl") {
+ sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+ assert(sqlContext.table("tbl").collect().toSeq == Seq(Row(1)))
+ }
+ }
+
+ test(s"$version: Delete the temporary staging directory and files after each insert") {
+ withTempDir { tmpDir =>
+ withTable("tab", "tbl") {
+ sqlContext.sql(
+ s"""
+ |CREATE TABLE tab(c1 string)
+ |location '${tmpDir.toURI.toString}'
+ """.stripMargin)
+
+ sqlContext.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+ sqlContext.sql(s"INSERT OVERWRITE TABLE tab SELECT * from tbl ")
+
+ def listFiles(path: File): List[String] = {
+ val dir = path.listFiles()
+ val folders = dir.filter(_.isDirectory).toList
+ val filePaths = dir.map(_.getName).toList
+ folders.flatMap(listFiles) ++: filePaths
+ }
+ val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+ assert(listFiles(tmpDir).sorted == expectedFiles)
+ }
+ }
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org