You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/06/29 20:53:32 UTC

spark git commit: [SPARK-8066, SPARK-8067] [hive] Add support for Hive 1.0, 1.1 and 1.2.

Repository: spark
Updated Branches:
  refs/heads/master ed413bcc7 -> 3664ee25f


[SPARK-8066, SPARK-8067] [hive] Add support for Hive 1.0, 1.1 and 1.2.

Allow HiveContext to connect to metastores of those versions; some new shims
had to be added to account for changing internal APIs.

A new test was added to exercise the "reset()" path which now also requires
a shim; and the test code was changed to use a directory under the build's
target to store ivy dependencies. Without that, at least I consistently run
into issues with Ivy messing up (or being confused) by my existing caches.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #7026 from vanzin/SPARK-8067 and squashes the following commits:

3e2e67b [Marcelo Vanzin] [SPARK-8066, SPARK-8067] [hive] Add support for Hive 1.0, 1.1 and 1.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3664ee25
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3664ee25
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3664ee25

Branch: refs/heads/master
Commit: 3664ee25f0a67de5ba76e9487a55a55216ae589f
Parents: ed413bc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Mon Jun 29 11:53:17 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jun 29 11:53:17 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/client/ClientWrapper.scala   |  5 +-
 .../apache/spark/sql/hive/client/HiveShim.scala | 70 +++++++++++++++++++-
 .../sql/hive/client/IsolatedClientLoader.scala  | 13 ++--
 .../apache/spark/sql/hive/client/package.scala  | 33 +++++++--
 .../spark/sql/hive/client/VersionsSuite.scala   | 25 +++++--
 5 files changed, 131 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3664ee25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 2f771d7..4c708ce 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -97,6 +97,9 @@ private[hive] class ClientWrapper(
     case hive.v12 => new Shim_v0_12()
     case hive.v13 => new Shim_v0_13()
     case hive.v14 => new Shim_v0_14()
+    case hive.v1_0 => new Shim_v1_0()
+    case hive.v1_1 => new Shim_v1_1()
+    case hive.v1_2 => new Shim_v1_2()
   }
 
   // Create an internal session state for this ClientWrapper.
@@ -456,7 +459,7 @@ private[hive] class ClientWrapper(
         logDebug(s"Deleting table $t")
         val table = client.getTable("default", t)
         client.getIndexes("default", t, 255).foreach { index =>
-          client.dropIndex("default", t, index.getIndexName, true)
+          shim.dropIndex(client, "default", t, index.getIndexName)
         }
         if (!table.isIndexTable) {
           client.dropTable("default", t)

http://git-wip-us.apache.org/repos/asf/spark/blob/3664ee25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index e7c1779..1fa9d27 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.lang.{Boolean => JBoolean, Integer => JInteger}
+import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
 import java.lang.reflect.{Method, Modifier}
 import java.net.URI
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
@@ -94,6 +94,8 @@ private[client] sealed abstract class Shim {
       holdDDLTime: Boolean,
       listBucketingEnabled: Boolean): Unit
 
+  def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
+
   protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
     val method = findMethod(klass, name, args: _*)
     require(Modifier.isStatic(method.getModifiers()),
@@ -166,6 +168,14 @@ private[client] class Shim_v0_12 extends Shim {
       JInteger.TYPE,
       JBoolean.TYPE,
       JBoolean.TYPE)
+  private lazy val dropIndexMethod =
+    findMethod(
+      classOf[Hive],
+      "dropIndex",
+      classOf[String],
+      classOf[String],
+      classOf[String],
+      JBoolean.TYPE)
 
   override def setCurrentSessionState(state: SessionState): Unit = {
     // Starting from Hive 0.13, setCurrentSessionState will internally override
@@ -234,6 +244,10 @@ private[client] class Shim_v0_12 extends Shim {
       numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean)
   }
 
+  override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
+    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
+  }
+
 }
 
 private[client] class Shim_v0_13 extends Shim_v0_12 {
@@ -379,3 +393,57 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
       TimeUnit.MILLISECONDS).asInstanceOf[Long]
   }
 }
+
+private[client] class Shim_v1_0 extends Shim_v0_14 {
+
+}
+
+private[client] class Shim_v1_1 extends Shim_v1_0 {
+
+  private lazy val dropIndexMethod =
+    findMethod(
+      classOf[Hive],
+      "dropIndex",
+      classOf[String],
+      classOf[String],
+      classOf[String],
+      JBoolean.TYPE,
+      JBoolean.TYPE)
+
+  override def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit = {
+    dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean, true: JBoolean)
+  }
+
+}
+
+private[client] class Shim_v1_2 extends Shim_v1_1 {
+
+  private lazy val loadDynamicPartitionsMethod =
+    findMethod(
+      classOf[Hive],
+      "loadDynamicPartitions",
+      classOf[Path],
+      classOf[String],
+      classOf[JMap[String, String]],
+      JBoolean.TYPE,
+      JInteger.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JBoolean.TYPE,
+      JLong.TYPE)
+
+  override def loadDynamicPartitions(
+      hive: Hive,
+      loadPath: Path,
+      tableName: String,
+      partSpec: JMap[String, String],
+      replace: Boolean,
+      numDP: Int,
+      holdDDLTime: Boolean,
+      listBucketingEnabled: Boolean): Unit = {
+    loadDynamicPartitionsMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
+      numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE,
+      0: JLong)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3664ee25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 0934ad5..3d609a6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -41,9 +41,11 @@ private[hive] object IsolatedClientLoader {
    */
   def forVersion(
       version: String,
-      config: Map[String, String] = Map.empty): IsolatedClientLoader = synchronized {
+      config: Map[String, String] = Map.empty,
+      ivyPath: Option[String] = None): IsolatedClientLoader = synchronized {
     val resolvedVersion = hiveVersion(version)
-    val files = resolvedVersions.getOrElseUpdate(resolvedVersion, downloadVersion(resolvedVersion))
+    val files = resolvedVersions.getOrElseUpdate(resolvedVersion,
+      downloadVersion(resolvedVersion, ivyPath))
     new IsolatedClientLoader(hiveVersion(version), files, config)
   }
 
@@ -51,9 +53,12 @@ private[hive] object IsolatedClientLoader {
     case "12" | "0.12" | "0.12.0" => hive.v12
     case "13" | "0.13" | "0.13.0" | "0.13.1" => hive.v13
     case "14" | "0.14" | "0.14.0" => hive.v14
+    case "1.0" | "1.0.0" => hive.v1_0
+    case "1.1" | "1.1.0" => hive.v1_1
+    case "1.2" | "1.2.0" => hive.v1_2
   }
 
-  private def downloadVersion(version: HiveVersion): Seq[URL] = {
+  private def downloadVersion(version: HiveVersion, ivyPath: Option[String]): Seq[URL] = {
     val hiveArtifacts = version.extraDeps ++
       Seq("hive-metastore", "hive-exec", "hive-common", "hive-serde")
         .map(a => s"org.apache.hive:$a:${version.fullVersion}") ++
@@ -64,7 +69,7 @@ private[hive] object IsolatedClientLoader {
       SparkSubmitUtils.resolveMavenCoordinates(
         hiveArtifacts.mkString(","),
         Some("http://www.datanucleus.org/downloads/maven2"),
-        None,
+        ivyPath,
         exclusions = version.exclusions)
     }
     val allFiles = classpath.split(",").map(new File(_)).toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/3664ee25/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index 27a3d8f..b48082f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -32,13 +32,36 @@ package object client {
     // Hive 0.14 depends on calcite 0.9.2-incubating-SNAPSHOT which does not exist in
     // maven central anymore, so override those with a version that exists.
     //
-    // org.pentaho:pentaho-aggdesigner-algorithm is also nowhere to be found, so exclude
-    // it explicitly. If it's needed by the metastore client, users will have to dig it
-    // out of somewhere and use configuration to point Spark at the correct jars.
+    // The other excluded dependencies are also nowhere to be found, so exclude them explicitly. If
+    // they're needed by the metastore client, users will have to dig them out of somewhere and use
+    // configuration to point Spark at the correct jars.
     case object v14 extends HiveVersion("0.14.0",
-      Seq("org.apache.calcite:calcite-core:1.3.0-incubating",
+      extraDeps = Seq("org.apache.calcite:calcite-core:1.3.0-incubating",
         "org.apache.calcite:calcite-avatica:1.3.0-incubating"),
-      Seq("org.pentaho:pentaho-aggdesigner-algorithm"))
+      exclusions = Seq("org.pentaho:pentaho-aggdesigner-algorithm"))
+
+    case object v1_0 extends HiveVersion("1.0.0",
+      exclusions = Seq("eigenbase:eigenbase-properties",
+        "org.pentaho:pentaho-aggdesigner-algorithm",
+        "net.hydromatic:linq4j",
+        "net.hydromatic:quidem"))
+
+    // The curator dependency was added to the exclusions here because it seems to confuse the ivy
+    // library. org.apache.curator:curator is a pom dependency but ivy tries to find the jar for it,
+    // and fails.
+    case object v1_1 extends HiveVersion("1.1.0",
+      exclusions = Seq("eigenbase:eigenbase-properties",
+        "org.apache.curator:*",
+        "org.pentaho:pentaho-aggdesigner-algorithm",
+        "net.hydromatic:linq4j",
+        "net.hydromatic:quidem"))
+
+    case object v1_2 extends HiveVersion("1.2.0",
+      exclusions = Seq("eigenbase:eigenbase-properties",
+        "org.apache.curator:*",
+        "org.pentaho:pentaho-aggdesigner-algorithm",
+        "net.hydromatic:linq4j",
+        "net.hydromatic:quidem"))
   }
   // scalastyle:on
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3664ee25/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9a57165..d52e162 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.client
 
+import java.io.File
+
 import org.apache.spark.{Logging, SparkFunSuite}
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.util.Utils
@@ -28,6 +30,12 @@ import org.apache.spark.util.Utils
  * is not fully tested.
  */
 class VersionsSuite extends SparkFunSuite with Logging {
+
+  // Do not use a temp path here to speed up subsequent executions of the unit test during
+  // development.
+  private val ivyPath = Some(
+    new File(sys.props("java.io.tmpdir"), "hive-ivy-cache").getAbsolutePath())
+
   private def buildConf() = {
     lazy val warehousePath = Utils.createTempDir()
     lazy val metastorePath = Utils.createTempDir()
@@ -38,7 +46,7 @@ class VersionsSuite extends SparkFunSuite with Logging {
   }
 
   test("success sanity check") {
-    val badClient = IsolatedClientLoader.forVersion("13", buildConf()).client
+    val badClient = IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client
     val db = new HiveDatabase("default", "")
     badClient.createDatabase(db)
   }
@@ -67,19 +75,21 @@ class VersionsSuite extends SparkFunSuite with Logging {
   // TODO: currently only works on mysql where we manually create the schema...
   ignore("failure sanity check") {
     val e = intercept[Throwable] {
-      val badClient = quietly { IsolatedClientLoader.forVersion("13", buildConf()).client }
+      val badClient = quietly {
+        IsolatedClientLoader.forVersion("13", buildConf(), ivyPath).client
+      }
     }
     assert(getNestedMessages(e) contains "Unknown column 'A0.OWNER_NAME' in 'field list'")
   }
 
-  private val versions = Seq("12", "13", "14")
+  private val versions = Seq("12", "13", "14", "1.0.0", "1.1.0", "1.2.0")
 
   private var client: ClientInterface = null
 
   versions.foreach { version =>
     test(s"$version: create client") {
       client = null
-      client = IsolatedClientLoader.forVersion(version, buildConf()).client
+      client = IsolatedClientLoader.forVersion(version, buildConf(), ivyPath).client
     }
 
     test(s"$version: createDatabase") {
@@ -170,5 +180,12 @@ class VersionsSuite extends SparkFunSuite with Logging {
         false,
         false)
     }
+
+    test(s"$version: create index and reset") {
+      client.runSqlHive("CREATE TABLE indexed_table (key INT)")
+      client.runSqlHive("CREATE INDEX index_1 ON TABLE indexed_table(key) " +
+        "as 'COMPACT' WITH DEFERRED REBUILD")
+      client.reset()
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org