You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/12/13 17:47:01 UTC
spark git commit: [SPARK-18675][SQL] CTAS for hive serde table should
work for all hive versions
Repository: spark
Updated Branches:
refs/heads/master 096f868b7 -> d53f18cae
[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions
## What changes were proposed in this pull request?
Before hive 1.1, when inserting into a table, hive will create the staging directory under a common scratch directory. After the writing is finished, hive will simply empty the table directory and move the staging directory to it.
After hive 1.1, hive will create the staging directory under the table directory, and when moving staging directory to table directory, hive will still empty the table directory, but will exclude the staging directory there.
In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we will always create the staging directory under the table directory, no matter what the hive version is. This causes problems if the hive version is prior to 1.1, because the staging directory will be removed by hive when hive is trying to empty the table directory.
This PR copies the code from hive 0.13, so that we have 2 branches to create staging directory. If hive version is prior to 1.1, we'll go to the old style branch(i.e. create the staging directory under a common scratch directory), else, go to the new style branch(i.e. create the staging directory under the table directory)
## How was this patch tested?
new test
Author: Wenchen Fan <we...@databricks.com>
Closes #16104 from cloud-fan/hive-0.13.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d53f18ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d53f18ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d53f18ca
Branch: refs/heads/master
Commit: d53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a
Parents: 096f868
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Dec 13 09:46:58 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Dec 13 09:46:58 2016 -0800
----------------------------------------------------------------------
.../hive/execution/InsertIntoHiveTable.scala | 68 +++++++++++++++++---
.../spark/sql/hive/client/VersionsSuite.scala | 19 +++++-
2 files changed, 75 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d53f18ca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index db2239d..82c7b1a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,7 +22,6 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,6 +85,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+ val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
private def executionId: String = {
val rand: Random = new Random
@@ -93,7 +93,7 @@ case class InsertIntoHiveTable(
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
- private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+ private def getStagingDir(inputPath: Path): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,69 @@ case class InsertIntoHiveTable(
return dir
}
- private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
- getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+ private def getExternalScratchDir(extURI: URI): Path = {
+ getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
}
- def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+ def getExternalTmpPath(path: Path): Path = {
+ import org.apache.spark.sql.hive.client.hive._
+
+ val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+ // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+ // a common scratch directory. After the writing is finished, Hive will simply empty the table
+ // directory and move the staging directory to it.
+ // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+ // moving staging directory to table directory, Hive will still empty the table directory, but
+ // will exclude the staging directory there.
+ // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+ // staging directory under the table director for Hive prior to 1.1, the staging directory will
+ // be removed by Hive when Hive is trying to empty the table directory.
+ if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+ oldVersionExternalTempPath(path)
+ } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+ newVersionExternalTempPath(path)
+ } else {
+ throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+ }
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+ def oldVersionExternalTempPath(path: Path): Path = {
+ val extURI: URI = path.toUri
+ val scratchPath = new Path(scratchDir, executionId)
+ var dirPath = new Path(
+ extURI.getScheme,
+ extURI.getAuthority,
+ scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+ try {
+ val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+ dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+ if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
+ }
+ fs.deleteOnExit(dirPath)
+ } catch {
+ case e: IOException =>
+ throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
+
+ }
+ dirPath
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+ def newVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path.getParent, hadoopConf)
+ getExtTmpPathRelTo(path.getParent)
} else {
- new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+ new Path(getExternalScratchDir(extURI), "-ext-10000")
}
}
- def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
- new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+ def getExtTmpPathRelTo(path: Path): Path = {
+ new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
}
private def saveAsHiveFile(
@@ -172,7 +220,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
+ val tmpLocation = getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
http://git-wip-us.apache.org/repos/asf/spark/blob/d53f18ca/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a001048..9b26383 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
@@ -532,5 +534,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.reset()
assert(client.listTables("default").isEmpty)
}
+
+ ///////////////////////////////////////////////////////////////////////////
+ // End-To-End tests
+ ///////////////////////////////////////////////////////////////////////////
+
+ test(s"$version: CREATE TABLE AS SELECT") {
+ withTable("tbl") {
+ spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+ assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
+ }
+ }
+
+ // TODO: add more tests.
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org