You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/04 00:59:55 UTC

spark git commit: [SPARK-13446][SQL] Support reading data from Hive 2.0.1 metastore

Repository: spark
Updated Branches:
  refs/heads/master 44281ca81 -> f5fdbe043


[SPARK-13446][SQL] Support reading data from Hive 2.0.1 metastore

### What changes were proposed in this pull request?
This PR is to make Spark work with Hive 2.0's metastores. Compared with Hive 1.2, Hive 2.0's metastore has an API update due to removal of `HOLD_DDLTIME` in https://issues.apache.org/jira/browse/HIVE-12224. Based on the following Hive JIRA description, `HOLD_DDLTIME` should be removed from our internal API too. (https://github.com/apache/spark/pull/17063 was submitted for it):
> This arcane feature was introduced long ago via HIVE-1394 It was broken as soon as it landed, HIVE-1442 and is thus useless. Fact that no one has fixed it since informs that its not really used by anyone. Better is to remove it so no one hits the bug of HIVE-1442

In the next PR, we will support 2.1.0 metastore, whose APIs were changed due to https://issues.apache.org/jira/browse/HIVE-12730. However, before that, we need a code cleanup for stats collection and setting.

### How was this patch tested?
Added test cases to VersionsSuite.scala

Author: Xiao Li <ga...@gmail.com>

Closes #17061 from gatorsmile/Hive2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f5fdbe04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f5fdbe04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f5fdbe04

Branch: refs/heads/master
Commit: f5fdbe04369d2c04972b7d9686d0c6c046f3d3dc
Parents: 44281ca
Author: Xiao Li <ga...@gmail.com>
Authored: Fri Mar 3 16:59:52 2017 -0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Mar 3 16:59:52 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/client/HiveClientImpl.scala  |  1 +
 .../apache/spark/sql/hive/client/HiveShim.scala | 74 ++++++++++++++++++++
 .../sql/hive/client/IsolatedClientLoader.scala  |  1 +
 .../apache/spark/sql/hive/client/package.scala  |  8 ++-
 .../hive/execution/InsertIntoHiveTable.scala    | 11 ++-
 .../sql/hive/client/HiveClientBuilder.scala     | 12 ++--
 .../spark/sql/hive/client/VersionsSuite.scala   |  9 ++-
 7 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index c326ac4..8f98c8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -96,6 +96,7 @@ private[hive] class HiveClientImpl(
     case hive.v1_0 => new Shim_v1_0()
     case hive.v1_1 => new Shim_v1_1()
     case hive.v1_2 => new Shim_v1_2()
+    case hive.v2_0 => new Shim_v2_0()
   }
 
   // Create an internal session state for this HiveClientImpl.

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 9fe1c76..7280748 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -833,3 +833,77 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
   }
 
 }
+
+private[client] class Shim_v2_0 extends Shim_v1_2 {
+  private lazy val loadPartitionMethod =
+    findMethod(
+      classOf[Hive],
+      "loadPartition",
+      classOf[Path],
+      classOf[String],
+      classOf[JMap[String, String]],
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE)
+  private lazy val loadTableMethod =
+    findMethod(
+      classOf[Hive],
+      "loadTable",
+      classOf[Path],
+      classOf[String],
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE)
+  private lazy val loadDynamicPartitionsMethod =
+    findMethod(
+      classOf[Hive],
+      "loadDynamicPartitions",
+      classOf[Path],
+      classOf[String],
+      classOf[JMap[String, String]],
+      JBoolean.TYPE,
+      JInteger.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JLong.TYPE)
+
+  override def loadPartition(
+      hive: Hive,
+      loadPath: Path,
+      tableName: String,
+      partSpec: JMap[String, String],
+      replace: Boolean,
+      inheritTableSpecs: Boolean,
+      isSkewedStoreAsSubdir: Boolean,
+      isSrcLocal: Boolean): Unit = {
+    loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+      inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
+      isSrcLocal: JBoolean, JBoolean.FALSE)
+  }
+
+  override def loadTable(
+      hive: Hive,
+      loadPath: Path,
+      tableName: String,
+      replace: Boolean,
+      isSrcLocal: Boolean): Unit = {
+    loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, isSrcLocal: JBoolean,
+      JBoolean.FALSE, JBoolean.FALSE)
+  }
+
+  override def loadDynamicPartitions(
+      hive: Hive,
+      loadPath: Path,
+      tableName: String,
+      partSpec: JMap[String, String],
+      replace: Boolean,
+      numDP: Int,
+      listBucketingEnabled: Boolean): Unit = {
+    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+      numDP: JInteger, listBucketingEnabled: JBoolean, JBoolean.FALSE, 0L: JLong)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index d2487a2..6f69a4a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -94,6 +94,7 @@ private[hive] object IsolatedClientLoader extends Logging {
     case "1.0" | "1.0.0" => hive.v1_0
     case "1.1" | "1.1.0" => hive.v1_1
     case "1.2" | "1.2.0" | "1.2.1" => hive.v1_2
+    case "2.0" | "2.0.0" | "2.0.1" => hive.v2_0
   }
 
   private def downloadVersion(

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 4e2193b..790ad74 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
 
 /** Support for interacting with different versions of the HiveMetastoreClient */
 package object client {
-  private[hive] abstract class HiveVersion(
+  private[hive] sealed abstract class HiveVersion(
       val fullVersion: String,
       val extraDeps: Seq[String] = Nil,
       val exclusions: Seq[String] = Nil)
@@ -62,6 +62,12 @@ package object client {
         "org.pentaho:pentaho-aggdesigner-algorithm",
         "net.hydromatic:linq4j",
         "net.hydromatic:quidem"))
+
+    case object v2_0 extends HiveVersion("2.0.1",
+      exclusions = Seq("org.apache.curator:*",
+        "org.pentaho:pentaho-aggdesigner-algorithm"))
+
+    val allSupportedHiveVersions = Set(v12, v13, v14, v1_0, v1_1, v1_2, v2_0)
   }
   // scalastyle:on
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index f107149..3c57ee4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -148,9 +148,16 @@ case class InsertIntoHiveTable(
     // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
     // staging directory under the table director for Hive prior to 1.1, the staging directory will
     // be removed by Hive when Hive is trying to empty the table directory.
-    if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+    val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0)
+    val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = Set(v1_1, v1_2, v2_0)
+
+    // Ensure all the supported versions are considered here.
+    assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath ==
+      allSupportedHiveVersions)
+
+    if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
       oldVersionExternalTempPath(path, hadoopConf, scratchDir)
-    } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+    } else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
       newVersionExternalTempPath(path, hadoopConf, stagingDir)
     } else {
       throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
index 591a968..e85ea5a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -35,22 +35,26 @@ private[client] class HiveClientBuilder {
       Some(new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath))
   }
 
-  private def buildConf() = {
+  private def buildConf(extraConf: Map[String, String]) = {
     lazy val warehousePath = Utils.createTempDir()
     lazy val metastorePath = Utils.createTempDir()
     metastorePath.delete()
-    Map(
+    extraConf ++ Map(
       "javax.jdo.option.ConnectionURL" -> s"jdbc:derby:;databaseName=$metastorePath;create=true",
       "hive.metastore.warehouse.dir" -> warehousePath.toString)
   }
 
-  def buildClient(version: String, hadoopConf: Configuration): HiveClient = {
+  // for testing only
+  def buildClient(
+      version: String,
+      hadoopConf: Configuration,
+      extraConf: Map[String, String] = Map.empty): HiveClient = {
     IsolatedClientLoader.forVersion(
       hiveMetastoreVersion = version,
       hadoopVersion = VersionInfo.getVersion,
       sparkConf = sparkConf,
       hadoopConf = hadoopConf,
-      config = buildConf(),
+      config = buildConf(extraConf),
       ivyPath = ivyPath).createClient()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f5fdbe04/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 6feb277..d61d10b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -88,7 +88,7 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
   }
 
-  private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2")
+  private val versions = Seq("0.12", "0.13", "0.14", "1.0", "1.1", "1.2", "2.0")
 
   private var client: HiveClient = null
 
@@ -98,7 +98,12 @@ class VersionsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton w
       System.gc() // Hack to avoid SEGV on some JVM versions.
       val hadoopConf = new Configuration()
       hadoopConf.set("test", "success")
-      client = buildClient(version, hadoopConf)
+      // Hive changed the default of datanucleus.schema.autoCreateAll from true to false since 2.0
+      // For details, see the JIRA HIVE-6113
+      if (version == "2.0") {
+        hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
+      }
+      client = buildClient(version, hadoopConf, HiveUtils.hiveClientConfigurations(hadoopConf))
     }
 
     def table(database: String, tableName: String): CatalogTable = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org