You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/03/01 08:26:17 UTC

spark git commit: [SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore

Repository: spark
Updated Branches:
  refs/heads/master 22f3d3334 -> ff1480189


[SPARK-23510][SQL] Support Hive 2.2 and Hive 2.3 metastore

## What changes were proposed in this pull request?
This is based on https://github.com/apache/spark/pull/20668 for supporting Hive 2.2 and Hive 2.3 metastore.

When we merge the PR, we should give the major credit to wangyum

## How was this patch tested?
Added the test cases

Author: Yuming Wang <yu...@ebay.com>
Author: gatorsmile <ga...@gmail.com>

Closes #20671 from gatorsmile/pr-20668.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff148018
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff148018
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff148018

Branch: refs/heads/master
Commit: ff1480189b827af0be38605d566a4ee71b4c36f6
Parents: 22f3d33
Author: Yuming Wang <yu...@ebay.com>
Authored: Thu Mar 1 16:26:11 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 1 16:26:11 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveUtils.scala   |  2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  3 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |  8 +--
 .../sql/hive/client/IsolatedClientLoader.scala  |  2 +
 .../apache/spark/sql/hive/client/package.scala  | 10 +++-
 .../sql/hive/execution/SaveAsHiveFile.scala     |  3 +-
 .../sql/hive/client/HiveClientVersions.scala    |  3 +-
 .../sql/hive/client/HiveVersionSuite.scala      |  2 +-
 .../spark/sql/hive/client/VersionsSuite.scala   | 51 ++++++++++++++++++--
 9 files changed, 72 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index c448c5a..10c9603 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging {
 
   val HIVE_METASTORE_VERSION = buildConf("spark.sql.hive.metastore.version")
     .doc("Version of the Hive metastore. Available options are " +
-        s"<code>0.12.0</code> through <code>2.1.1</code>.")
+        s"<code>0.12.0</code> through <code>2.3.2</code>.")
     .stringConf
     .createWithDefault(builtinHiveVersion)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 146fa54..da9fe2d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.conf.HiveConf
@@ -104,6 +103,8 @@ private[hive] class HiveClientImpl(
     case hive.v1_2 => new Shim_v1_2()
     case hive.v2_0 => new Shim_v2_0()
     case hive.v2_1 => new Shim_v2_1()
+    case hive.v2_2 => new Shim_v2_2()
+    case hive.v2_3 => new Shim_v2_3()
   }
 
   // Create an internal session state for this HiveClientImpl.

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 1eac70d..948ba54 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -880,9 +880,7 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
 
 }
 
-private[client] class Shim_v1_0 extends Shim_v0_14 {
-
-}
+private[client] class Shim_v1_0 extends Shim_v0_14
 
 private[client] class Shim_v1_1 extends Shim_v1_0 {
 
@@ -1146,3 +1144,7 @@ private[client] class Shim_v2_1 extends Shim_v2_0 {
     alterPartitionsMethod.invoke(hive, tableName, newParts, environmentContextInAlterTable)
   }
 }
+
+private[client] class Shim_v2_2 extends Shim_v2_1
+
+private[client] class Shim_v2_3 extends Shim_v2_1

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index dac0e33..12975bc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -97,6 +97,8 @@ private[hive] object IsolatedClientLoader extends Logging {
     case "1.2" | "1.2.0" | "1.2.1" | "1.2.2" => hive.v1_2
     case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
     case "2.1" | "2.1.0" | "2.1.1" => hive.v2_1
+    case "2.2" | "2.2.0" => hive.v2_2
+    case "2.3" | "2.3.0" | "2.3.1" | "2.3.2" => hive.v2_3
   }
 
   private def downloadVersion(

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index c14154a..681ee92 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -71,7 +71,15 @@ package object client {
       exclusions = Seq("org.apache.curator:*",
         "org.pentaho:pentaho-aggdesigner-algorithm"))
 
-    val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1)
+    case object v2_2 extends HiveVersion("2.2.0",
+      exclusions = Seq("org.apache.curator:*",
+        "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+    case object v2_3 extends HiveVersion("2.3.2",
+      exclusions = Seq("org.apache.curator:*",
+        "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+    val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
   }
   // scalastyle:on
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
index e484356..6a7b25b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala
@@ -114,7 +114,8 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand {
     // staging directory under the table director for Hive prior to 1.1, the staging directory will
     // be removed by Hive when Hive is trying to empty the table directory.
     val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
-    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0, v2_1)
+    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
+      Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3)
 
     // Ensure all the supported versions are considered here.
     assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
index 2e7dfde..30592a3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala
@@ -22,5 +22,6 @@ import scala.collection.immutable.IndexedSeq
 import org.apache.spark.SparkFunSuite
 
 private[client] trait HiveClientVersions {
-  protected val versions = IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
+  protected val versions =
+    IndexedSeq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index a70fb64..e5963d0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -34,7 +34,7 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
     // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
     // hive.metastore.schema.verification from false to true since 2.0
     // For details, see the JIRA HIVE-6113 and HIVE-12463
-    if (version == "2.0" || version == "2.1") {
+    if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
       hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
       hadoopConf.set("hive.metastore.schema.verification", "false")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff148018/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 72536b8..6176273 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
 import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 import org.apache.hadoop.mapred.TextInputFormat
@@ -110,7 +111,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
   }
 
-  private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1")
+  private val versions =
+    Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0", "2.1", "2.2", "2.3")
 
   private var client: HiveClient = null
 
@@ -125,7 +127,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
       // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
       // hive.metastore.schema.verification from false to true since 2.0
       // For details, see the JIRA HIVE-6113 and HIVE-12463
-      if (version == "2.0" || version == "2.1") {
+      if (version == "2.0" || version == "2.1" || version == "2.2" || version == "2.3") {
         hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
         hadoopConf.set("hive.metastore.schema.verification", "false")
       }
@@ -422,15 +424,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
 
     test(s"$version: alterPartitions") {
       val spec = Map("key1" -> "1", "key2" -> "2")
+      val parameters = Map(StatsSetupConst.TOTAL_SIZE -> "0", StatsSetupConst.NUM_FILES -> "1")
       val newLocation = new URI(Utils.createTempDir().toURI.toString.stripSuffix("/"))
       val storage = storageFormat.copy(
         locationUri = Some(newLocation),
         // needed for 0.12 alter partitions
         serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-      val partition = CatalogTablePartition(spec, storage)
+      val partition = CatalogTablePartition(spec, storage, parameters)
       client.alterPartitions("default", "src_part", Seq(partition))
       assert(client.getPartition("default", "src_part", spec)
         .storage.locationUri == Some(newLocation))
+      assert(client.getPartition("default", "src_part", spec)
+        .parameters.get(StatsSetupConst.TOTAL_SIZE) == Some("0"))
     }
 
     test(s"$version: dropPartitions") {
@@ -633,6 +638,46 @@ class VersionsSuite extends SparkFunSuite with Logging {
       }
     }
 
+    test(s"$version: CREATE Partitioned TABLE AS SELECT") {
+      withTable("tbl") {
+        versionSpark.sql(
+          """
+            |CREATE TABLE tbl(c1 string)
+            |PARTITIONED BY (ds STRING)
+          """.stripMargin)
+        versionSpark.sql("INSERT OVERWRITE TABLE tbl partition (ds='2') SELECT '1'")
+
+        assert(versionSpark.table("tbl").collect().toSeq == Seq(Row("1", "2")))
+        val partMeta = versionSpark.sessionState.catalog.getPartition(
+          TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
+        val totalSize = partMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+        val numFiles = partMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
+        // Except 0.12, all the following versions will fill the Hive-generated statistics
+        if (version == "0.12") {
+          assert(totalSize.isEmpty && numFiles.isEmpty)
+        } else {
+          assert(totalSize.nonEmpty && numFiles.nonEmpty)
+        }
+
+        versionSpark.sql(
+          """
+            |ALTER TABLE tbl PARTITION (ds='2')
+            |SET SERDEPROPERTIES ('newKey' = 'vvv')
+          """.stripMargin)
+        val newPartMeta = versionSpark.sessionState.catalog.getPartition(
+          TableIdentifier("tbl"), spec = Map("ds" -> "2")).parameters
+
+        val newTotalSize = newPartMeta.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong)
+        val newNumFiles = newPartMeta.get(StatsSetupConst.NUM_FILES).map(_.toLong)
+        // Except 0.12, all the following versions will fill the Hive-generated statistics
+        if (version == "0.12") {
+          assert(newTotalSize.isEmpty && newNumFiles.isEmpty)
+        } else {
+          assert(newTotalSize.nonEmpty && newNumFiles.nonEmpty)
+        }
+      }
+    }
+
     test(s"$version: Delete the temporary staging directory and files after each insert") {
       withTempDir { tmpDir =>
         withTable("tab") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org