You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/01 08:26:17 UTC
spark git commit: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3
metastore
Repository: spark
Updated Branches:
refs/heads/master 22f3d3334 -> ff1480189
[SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore
## What changes were proposed in this pull request?
This is based on https://github.com/apache/spark/pull/20668 for supporting Hive 2.2 and Hive 2.3 metastore.
When we merge the PR, we should give the major credit to wangyum
## How was this patch tested?
Added the test cases
Author: Yuming Wang <yu...@ebay.com>
Author: gatorsmile <ga...@gmail.com>
Closes #20671 from gatorsmile/pr-20668.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff148018
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff148018
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff148018
Branch: refs/heads/master
Commit: ff1480189b827af0be38605d566a4ee71b4c36f6
Parents: 22f3d33
Author: Yuming Wang <yu...@ebay.com>
Authored: Thu Mar 1 16:26:11 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 1 16:26:11 2018 +0800
----------------------------------------------------------------------
.../org/apache/spark/sql/hive/HiveUtils.scala | 2 +-
.../spark/sql/hive/client/HiveClientImpl.scala | 3 +-
.../apache/spark/sql/hive/client/HiveShim.scala | 8 +--
.../sql/hive/client/IsolatedClientLoader.scala | 2 +
.../apache/spark/sql/hive/client/package.scala | 10 +++-
.../sql/hive/execution/SaveAsHiveFile.scala | 3 +-
.../sql/hive/client/HiveClientVersions.scala | 3 +-
.../sql/hive/client/HiveVersionSuite.scala | 2 +-
.../spark/sql/hive/client/VersionsSuite.scala | 51 ++++++++++++++++++--
9 files changed, 72 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index c448c5a..10c9603 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {
val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
.doc("Version of the Hive metastore. Available options are " +
- s"<code>0.12.0</code> through <code>2.1.1</code>.")
+ s"<code>0.12.0</code> through <code>2.3.2</code>.")
.stringConf
.createWithDefault(builtinHiveVersion)
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 146fa54..da9fe2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
@@ -104,6 +103,8 @@ private[hive] class HiveClientImpl(
case hive.v1_2 => new Shim_v1_2()
case hive.v2_0 => new Shim_v2_0()
case hive.v2_1 => new Shim_v2_1()
+ case hive.v2_2 => new Shim_v2_2()
+ case hive.v2_3 => new Shim_v2_3()
}
// Create an internal session state for this HiveClientImpl.
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 1eac70d..948ba54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
}
-private[client] class Shim_v1_0 extends Shim_v0_14 {
-
-}
+private[client] class Shim_v1_0 extends Shim_v0_14
private[client] class Shim_v1_1 extends Shim_v1_0 {
@@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
}
}
+
+private[client] class Shim_v2_2 extends Shim_v2_1
+
+private[client] class Shim_v2_3 extends Shim_v2_1
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index dac0e33..12975bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -97,6 +97,8 @@ private[hive] object IsolatedClientLoader extends Logging {
case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
+ case "2.2" | "2.2.0" => hive.v2_2
+ case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
}
private def downloadVersion(
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index c14154a..681ee92 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -71,7 +71,15 @@ package object client {
exclusions = Seq("org.apache.curator:*",
"org.pentaho:pentaho-aggdesigner-algorithm"))
- val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
+ case object v2_2 extends HiveVersion("2.2.0",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ case object v2_3 extends HiveVersion("2.3.2",
+ exclusions = Seq("org.apache.curator:*",
+ "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+ val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
}
// scalastyle:on
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index e484356..6a7b25b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
- val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
+ val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
+ Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
// Ensure all the supported versions are considered here.
assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 2e7dfde..30592a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
import org.apache.spark.SparkFunSuite
private[client] trait HiveClientVersions {
- protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
+ protected val versions =
+ IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index a70fb64..e5963d0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -34,7 +34,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
- if (version == "2.0" || version == "2.1") {
+ if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 72536b8..6176273 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
@@ -110,7 +111,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
}
- private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
+ private val versions =
+ Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
private var client: HiveClient = null
@@ -125,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
// Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
// hive.metastore.schema.verification from false to true since 2.0
// For details, see the JIRA HIVE-6113 and HIVE-12463
- if (version == "2.0" || version == "2.1") {
+ if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
hadoopConf.set("hive.metastore.schema.verification", "false")
}
@@ -422,15 +424,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: alterPartitions") {
val spec = Map("key1" -> "1", "key2" -> "2")
+ val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
val storage = storageFormat.copy(
locationUri = Some(newLocation),
// needed for 0.12 alter partitions
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
- val partition = CatalogTablePartition(spec, storage)
+ val partition = CatalogTablePartition(spec, storage, parameters)
client.alterPartitions("default", "src_part", Seq(partition))
assert(client.getPartition("default", "src_part", spec)
.storage.locationUri == Some(newLocation))
+ assert(client.getPartition("default", "src_part", spec)
+ .parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
}
test(s"$version: dropPartitions") {
@@ -633,6 +638,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
}
+ test(s"$version: CREATE Partitioned TABLE AS SELECT") {
+ withTable("tbl") {
+ versionSpark.sql(
+ """
+ |CREATE TABLE tbl(c1 string)
+ |PARTITIONED BY (ds STRING)
+ """.stripMargin)
+ versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")
+
+ assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
+ val partMeta = versionSpark.sessionState.catalog.getPartition(
+ TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
+ val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
+ // Except 0.12, all the following versions will fill the Hive-generated statistics
+ if (version == "0.12") {
+ assert(totalSize.isEmpty && numFiles.isEmpty)
+ } else {
+ assert(totalSize.nonEmpty && numFiles.nonEmpty)
+ }
+
+ versionSpark.sql(
+ """
+ |ALTER TABLE tbl PARTITION (ds='2')
+ |SET SERDEPROPERTIES ('newKey' = 'vvv')
+ """.stripMargin)
+ val newPartMeta = versionSpark.sessionState.catalog.getPartition(
+ TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
+
+ val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+ val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
+ // Except 0.12, all the following versions will fill the Hive-generated statistics
+ if (version == "0.12") {
+ assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
+ } else {
+ assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
+ }
+ }
+ }
+
test(s"$version: Delete the temporary staging directory and files after each insert") {
withTempDir { tmpDir =>
withTable("tab") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org