You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/01/23 20:51:46 UTC

spark git commit: [SPARK-17088][HIVE] Fix 'sharesHadoopClasses' option when creating client.

Repository: spark
Updated Branches:
  refs/heads/master bdebb8e48 -> dc4761fd8


[SPARK-17088][HIVE] Fix 'sharesHadoopClasses' option when creating client.

Because the call to the constructor of HiveClientImpl crosses class loader
boundaries, different versions of the same class (Configuration in this
case) were loaded, and that caused a runtime error when instantiating the
client. By using a safer type in the signature of the constructor, it's
possible to avoid the problem.

I considered removing 'sharesHadoopClasses', but it may still be desired
(even though there are 0 users of it since it was not working). When Spark
starts to support Hadoop 3, it may be necessary to use that option to
load clients for older Hive metastore versions that don't know about
Hadoop 3.

Tested with added unit test.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20169 from vanzin/SPARK-17088.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dc4761fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dc4761fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dc4761fd

Branch: refs/heads/master
Commit: dc4761fd8f0eec1d001e53837e65f7c5fe4e248d
Parents: bdebb8e
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Jan 23 12:51:40 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Jan 23 12:51:40 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/client/HiveClientImpl.scala      |  8 +++++---
 .../sql/hive/client/IsolatedClientLoader.scala      | 16 ++++++++++------
 .../spark/sql/hive/client/HiveClientBuilder.scala   |  6 ++++--
 .../spark/sql/hive/client/HiveClientSuite.scala     |  4 ++++
 .../spark/sql/hive/client/HiveVersionSuite.scala    | 11 ++++++++---
 5 files changed, 31 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dc4761fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 4b923f5..39d8390 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.sql.hive.client
 
 import java.io.{File, PrintStream}
-import java.util.Locale
+import java.lang.{Iterable => JIterable}
+import java.util.{Locale, Map => JMap}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -82,8 +83,9 @@ import org.apache.spark.util.{CircularBuffer, Utils}
  */
 private[hive] class HiveClientImpl(
     override val version: HiveVersion,
+    warehouseDir: Option[String],
     sparkConf: SparkConf,
-    hadoopConf: Configuration,
+    hadoopConf: JIterable[JMap.Entry[String, String]],
     extraConfig: Map[String, String],
     initClassLoader: ClassLoader,
     val clientLoader: IsolatedClientLoader)
@@ -130,7 +132,7 @@ private[hive] class HiveClientImpl(
       if (ret != null) {
         // hive.metastore.warehouse.dir is determined in SharedState after the CliSessionState
         // instance constructed, we need to follow that change here.
-        Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname)).foreach { dir =>
+        warehouseDir.foreach { dir =>
           ret.getConf.setVar(ConfVars.METASTOREWAREHOUSE, dir)
         }
         ret

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4761fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index 7a76fd3..dac0e33 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -26,6 +26,7 @@ import scala.util.Try
 
 import org.apache.commons.io.{FileUtils, IOUtils}
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkSubmitUtils
@@ -48,11 +49,12 @@ private[hive] object IsolatedClientLoader extends Logging {
       config: Map[String, String] = Map.empty,
       ivyPath: Option[String] = None,
       sharedPrefixes: Seq[String] = Seq.empty,
-      barrierPrefixes: Seq[String] = Seq.empty): IsolatedClientLoader = synchronized {
+      barrierPrefixes: Seq[String] = Seq.empty,
+      sharesHadoopClasses: Boolean = true): IsolatedClientLoader = synchronized {
     val resolvedVersion = hiveVersion(hiveMetastoreVersion)
     // We will first try to share Hadoop classes. If we cannot resolve the Hadoop artifact
     // with the given version, we will use Hadoop 2.6 and then will not share Hadoop classes.
-    var sharesHadoopClasses = true
+    var _sharesHadoopClasses = sharesHadoopClasses
     val files = if (resolvedVersions.contains((resolvedVersion, hadoopVersion))) {
       resolvedVersions((resolvedVersion, hadoopVersion))
     } else {
@@ -68,7 +70,7 @@ private[hive] object IsolatedClientLoader extends Logging {
               "Hadoop classes will not be shared between Spark and Hive metastore client. " +
               "It is recommended to set jars used by Hive metastore client through " +
               "spark.sql.hive.metastore.jars in the production environment.")
-            sharesHadoopClasses = false
+            _sharesHadoopClasses = false
             (downloadVersion(resolvedVersion, "2.6.5", ivyPath), "2.6.5")
         }
       resolvedVersions.put((resolvedVersion, actualHadoopVersion), downloadedFiles)
@@ -81,7 +83,7 @@ private[hive] object IsolatedClientLoader extends Logging {
       execJars = files,
       hadoopConf = hadoopConf,
       config = config,
-      sharesHadoopClasses = sharesHadoopClasses,
+      sharesHadoopClasses = _sharesHadoopClasses,
       sharedPrefixes = sharedPrefixes,
       barrierPrefixes = barrierPrefixes)
   }
@@ -249,8 +251,10 @@ private[hive] class IsolatedClientLoader(
 
   /** The isolated client interface to Hive. */
   private[hive] def createClient(): HiveClient = synchronized {
+    val warehouseDir = Option(hadoopConf.get(ConfVars.METASTOREWAREHOUSE.varname))
     if (!isolationOn) {
-      return new HiveClientImpl(version, sparkConf, hadoopConf, config, baseClassLoader, this)
+      return new HiveClientImpl(version, warehouseDir, sparkConf, hadoopConf, config,
+        baseClassLoader, this)
     }
     // Pre-reflective instantiation setup.
     logDebug("Initializing the logger to avoid disaster...")
@@ -261,7 +265,7 @@ private[hive] class IsolatedClientLoader(
       classLoader
         .loadClass(classOf[HiveClientImpl].getName)
         .getConstructors.head
-        .newInstance(version, sparkConf, hadoopConf, config, classLoader, this)
+        .newInstance(version, warehouseDir, sparkConf, hadoopConf, config, classLoader, this)
         .asInstanceOf[HiveClient]
     } catch {
       case e: InvocationTargetException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4761fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
index ae804ce..ab73f66 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientBuilder.scala
@@ -46,13 +46,15 @@ private[client] object HiveClientBuilder {
   def buildClient(
       version: String,
       hadoopConf: Configuration,
-      extraConf: Map[String, String] = Map.empty): HiveClient = {
+      extraConf: Map[String, String] = Map.empty,
+      sharesHadoopClasses: Boolean = true): HiveClient = {
     IsolatedClientLoader.forVersion(
       hiveMetastoreVersion = version,
       hadoopVersion = VersionInfo.getVersion,
       sparkConf = new SparkConf(),
       hadoopConf = hadoopConf,
       config = buildConf(extraConf),
-      ivyPath = ivyPath).createClient()
+      ivyPath = ivyPath,
+      sharesHadoopClasses = sharesHadoopClasses).createClient()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4761fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index a5dfd89..f991352 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -202,6 +202,10 @@ class HiveClientSuite(version: String)
       day1 :: day2 :: Nil)
   }
 
+  test("create client with sharesHadoopClasses = false") {
+    buildClient(new Configuration(), sharesHadoopClasses = false)
+  }
+
   private def testMetastorePartitionFiltering(
       filterString: String,
       expectedDs: Seq[Int],

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4761fd/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
index bb8a469..a70fb64 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala
@@ -28,7 +28,9 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
   override protected val enableAutoThreadAudit = false
   protected var client: HiveClient = null
 
-  protected def buildClient(hadoopConf: Configuration): HiveClient = {
+  protected def buildClient(
+      hadoopConf: Configuration,
+      sharesHadoopClasses: Boolean = true): HiveClient = {
     // Hive changed the default of datanucleus.schema.autoCreateAll from true to false and
     // hive.metastore.schema.verification from false to true since 2.0
     // For details, see the JIRA HIVE-6113 and HIVE-12463
@@ -36,8 +38,11 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu
       hadoopConf.set("datanucleus.schema.autoCreateAll", "true")
       hadoopConf.set("hive.metastore.schema.verification", "false")
     }
-    HiveClientBuilder
-      .buildClient(version, hadoopConf, HiveUtils.formatTimeVarsForHiveClient(hadoopConf))
+    HiveClientBuilder.buildClient(
+      version,
+      hadoopConf,
+      HiveUtils.formatTimeVarsForHiveClient(hadoopConf),
+      sharesHadoopClasses = sharesHadoopClasses)
   }
 
   override def suiteName: String = s"${super.suiteName}($version)"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org