You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by su...@apache.org on 2023/02/27 22:59:00 UTC

[spark] branch branch-3.4 updated: [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client

This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 40a4019dfc5 [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client
40a4019dfc5 is described below

commit 40a4019dfc57f48c95b7f32c63b2b2dfd3d1c58a
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Mon Feb 27 14:57:52 2023 -0800

    [SPARK-42539][SQL][HIVE] Eliminate separate classloader when using 'builtin' Hive version for metadata client
    
    ### What changes were proposed in this pull request?
    When using the 'builtin' Hive version for the Hive metadata client, do not create a separate classloader, and rather continue to use the overall user/application classloader (regardless of Java version). This standardizes the behavior for all Java versions with that of Java 9+. See SPARK-42539 for more details on why this approach was chosen.
    
    ### Why are the changes needed?
    Please see a much more detailed description in SPARK-42539. The tl;dr is that user-provided JARs (such as `hive-exec-2.3.8.jar`) take precedence over Spark/system JARs when constructing the classloader used by `IsolatedClientLoader` on Java 8 in 'builtin' mode, which can cause unexpected behavior and/or breakages. This violates the expectation that, unless user-first classloader mode is used, Spark JARs should be prioritized over user JARs. It also seems that this separate classloader [...]
    > attempt to discover the jars that were used to load Spark SQL and use those. This option is only valid when using the execution version of Hive.
    
    I can't follow the logic here; the user classloader clearly has all of the necessary Hive JARs, since that's where we're getting the JAR URLs from, so we could just use that directly instead of grabbing the URLs. When this was initially added, it only used the JARs from the user classloader, not any of its parents, which I suspect was the motivating factor (to try to avoid more Spark classes being duplicated inside of the isolated classloader, I guess). But that was changed a month la [...]
    
    ### Does this PR introduce _any_ user-facing change?
    No, except to protect Spark itself from potentially being broken by bad user JARs.
    
    ### How was this patch tested?
    This includes a new unit test in `HiveUtilsSuite` which demonstrates the issue and shows that this approach resolves it. It has also been tested on a live cluster running Java 8 and Hive communication functionality continues to work as expected.
    
    Closes #40144 from xkrogen/xkrogen/SPARK-42539/hive-isolatedclientloader-builtin-user-jar-conflict-fix/java9strategy.
    
    Authored-by: Erik Krogen <xk...@apache.org>
    Signed-off-by: Chao Sun <su...@apple.com>
---
 .../main/scala/org/apache/spark/TestUtils.scala    |  5 +-
 .../org/apache/spark/sql/hive/HiveUtils.scala      | 53 +-------------
 .../sql/hive/client/IsolatedClientLoader.scala     | 83 ++++++++++------------
 .../org/apache/spark/sql/hive/HiveUtilsSuite.scala | 34 ++++++++-
 4 files changed, 78 insertions(+), 97 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index bdf81d22efa..13ae6aca38b 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -193,12 +193,15 @@ private[spark] object TestUtils {
       baseClass: String = null,
       classpathUrls: Seq[URL] = Seq.empty,
       implementsClasses: Seq[String] = Seq.empty,
-      extraCodeBody: String = ""): File = {
+      extraCodeBody: String = "",
+      packageName: Option[String] = None): File = {
     val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
     val implementsText =
       "implements " + (implementsClasses :+ "java.io.Serializable").mkString(", ")
+    val packageText = packageName.map(p => s"package $p;\n").getOrElse("")
     val sourceFile = new JavaSourceFromString(className,
       s"""
+         |$packageText
          |public class $className $extendsText $implementsText {
          |  @Override public String toString() { return "$toStringValue"; }
          |
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
index fe9bdef3d0e..1a0cac42fa7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.File
-import java.net.{URL, URLClassLoader}
+import java.net.URL
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
@@ -26,11 +26,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.util.Try
 
-import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.util.VersionInfo
 import org.apache.hive.common.util.HiveVersionInfo
 
@@ -46,7 +44,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf._
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{ChildFirstURLClassLoader, Utils}
+import org.apache.spark.util.Utils
 
 
 private[spark] object HiveUtils extends Logging {
@@ -321,22 +319,6 @@ private[spark] object HiveUtils extends Logging {
     (commonTimeVars ++ hardcodingTimeVars).toMap
   }
 
-  /**
-   * Check current Thread's SessionState type
-   * @return true when SessionState.get returns an instance of CliSessionState,
-   *         false when it gets non-CliSessionState instance or null
-   */
-  def isCliSessionState(): Boolean = {
-    val state = SessionState.get
-    var temp: Class[_] = if (state != null) state.getClass else null
-    var found = false
-    while (temp != null && !found) {
-      found = temp.getName == "org.apache.hadoop.hive.cli.CliSessionState"
-      temp = temp.getSuperclass
-    }
-    found
-  }
-
   /**
    * Create a [[HiveClient]] used for execution.
    *
@@ -409,43 +391,14 @@ private[spark] object HiveUtils extends Logging {
             s"or change ${HIVE_METASTORE_VERSION.key} to $builtinHiveVersion.")
       }
 
-      // We recursively find all jars in the class loader chain,
-      // starting from the given classLoader.
-      def allJars(classLoader: ClassLoader): Array[URL] = classLoader match {
-        case null => Array.empty[URL]
-        case childFirst: ChildFirstURLClassLoader =>
-          childFirst.getURLs() ++ allJars(Utils.getSparkClassLoader)
-        case urlClassLoader: URLClassLoader =>
-          urlClassLoader.getURLs ++ allJars(urlClassLoader.getParent)
-        case other => allJars(other.getParent)
-      }
-
-      val classLoader = Utils.getContextOrSparkClassLoader
-      val jars: Array[URL] = if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-        // Do nothing. The system classloader is no longer a URLClassLoader in Java 9,
-        // so it won't match the case in allJars. It no longer exposes URLs of
-        // the system classpath
-        Array.empty[URL]
-      } else {
-        val loadedJars = allJars(classLoader)
-        // Verify at least one jar was found
-        if (loadedJars.length == 0) {
-          throw new IllegalArgumentException(
-            "Unable to locate hive jars to connect to metastore. " +
-              s"Please set ${HIVE_METASTORE_JARS.key}.")
-        }
-        loadedJars
-      }
-
       logInfo(
         s"Initializing HiveMetastoreConnection version $hiveMetastoreVersion using Spark classes.")
       new IsolatedClientLoader(
         version = metaVersion,
         sparkConf = conf,
         hadoopConf = hadoopConf,
-        execJars = jars.toSeq,
         config = configurations,
-        isolationOn = !isCliSessionState(),
+        isolationOn = false,
         barrierPrefixes = hiveMetastoreBarrierPrefixes,
         sharedPrefixes = hiveMetastoreSharedPrefixes)
     } else if (hiveMetastoreJars == "maven") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
index e65e6d42937..879b2451cae 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala
@@ -232,51 +232,46 @@ private[hive] class IsolatedClientLoader(
   private[hive] val classLoader: MutableURLClassLoader = {
     val isolatedClassLoader =
       if (isolationOn) {
-        if (allJars.isEmpty) {
-          // See HiveUtils; this is the Java 9+ + builtin mode scenario
-          baseClassLoader
-        } else {
-          val rootClassLoader: ClassLoader =
-            if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
-              // In Java 9, the boot classloader can see few JDK classes. The intended parent
-              // classloader for delegation is now the platform classloader.
-              // See http://java9.wtf/class-loading/
-              val platformCL =
-              classOf[ClassLoader].getMethod("getPlatformClassLoader").
-                invoke(null).asInstanceOf[ClassLoader]
-              // Check to make sure that the root classloader does not know about Hive.
-              assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
-              platformCL
+        val rootClassLoader: ClassLoader =
+          if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
+            // In Java 9, the boot classloader can see few JDK classes. The intended parent
+            // classloader for delegation is now the platform classloader.
+            // See http://java9.wtf/class-loading/
+            val platformCL =
+            classOf[ClassLoader].getMethod("getPlatformClassLoader").
+              invoke(null).asInstanceOf[ClassLoader]
+            // Check to make sure that the root classloader does not know about Hive.
+            assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
+            platformCL
+          } else {
+            // The boot classloader is represented by null (the instance itself isn't accessible)
+            // and before Java 9 can see all JDK classes
+            null
+          }
+        new URLClassLoader(allJars, rootClassLoader) {
+          override def loadClass(name: String, resolve: Boolean): Class[_] = {
+            val loaded = findLoadedClass(name)
+            if (loaded == null) doLoadClass(name, resolve) else loaded
+          }
+          def doLoadClass(name: String, resolve: Boolean): Class[_] = {
+            val classFileName = name.replaceAll("\\.", "/") + ".class"
+            if (isBarrierClass(name)) {
+              // For barrier classes, we construct a new copy of the class.
+              val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
+              logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
+              defineClass(name, bytes, 0, bytes.length)
+            } else if (!isSharedClass(name)) {
+              logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
+              super.loadClass(name, resolve)
             } else {
-              // The boot classloader is represented by null (the instance itself isn't accessible)
-              // and before Java 9 can see all JDK classes
-              null
-            }
-          new URLClassLoader(allJars, rootClassLoader) {
-            override def loadClass(name: String, resolve: Boolean): Class[_] = {
-              val loaded = findLoadedClass(name)
-              if (loaded == null) doLoadClass(name, resolve) else loaded
-            }
-            def doLoadClass(name: String, resolve: Boolean): Class[_] = {
-              val classFileName = name.replaceAll("\\.", "/") + ".class"
-              if (isBarrierClass(name)) {
-                // For barrier classes, we construct a new copy of the class.
-                val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
-                logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
-                defineClass(name, bytes, 0, bytes.length)
-              } else if (!isSharedClass(name)) {
-                logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
-                super.loadClass(name, resolve)
-              } else {
-                // For shared classes, we delegate to baseClassLoader, but fall back in case the
-                // class is not found.
-                logDebug(s"shared class: $name")
-                try {
-                  baseClassLoader.loadClass(name)
-                } catch {
-                  case _: ClassNotFoundException =>
-                    super.loadClass(name, resolve)
-                }
+              // For shared classes, we delegate to baseClassLoader, but fall back in case the
+              // class is not found.
+              logDebug(s"shared class: $name")
+              try {
+                baseClassLoader.loadClass(name)
+              } catch {
+                case _: ClassNotFoundException =>
+                  super.loadClass(name, resolve)
               }
             }
           }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
index d8e1e012928..823ac8ed957 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveUtilsSuite.scala
@@ -17,15 +17,19 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.File
+import java.net.URI
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, TestUtils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.catalog.CatalogDatabase
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
-import org.apache.spark.util.ChildFirstURLClassLoader
+import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader}
 
 class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
 
@@ -77,6 +81,32 @@ class HiveUtilsSuite extends QueryTest with SQLTestUtils with TestHiveSingleton
     }
   }
 
+  test("SPARK-42539: User-provided JARs should not take precedence over builtin Hive JARs") {
+    withTempDir { tmpDir =>
+        val classFile = TestUtils.createCompiledClass(
+          "Hive", tmpDir, packageName = Some("org.apache.hadoop.hive.ql.metadata"))
+
+      val jarFile = new File(tmpDir, "hive-fake.jar")
+      TestUtils.createJar(Seq(classFile), jarFile, Some("org/apache/hadoop/hive/ql/metadata"))
+
+      val conf = new SparkConf
+      val contextClassLoader = Thread.currentThread().getContextClassLoader
+      val loader = new MutableURLClassLoader(Array(jarFile.toURI.toURL), contextClassLoader)
+      try {
+        Thread.currentThread().setContextClassLoader(loader)
+        val client = HiveUtils.newClientForMetadata(
+          conf,
+          SparkHadoopUtil.newConfiguration(conf),
+          HiveUtils.newTemporaryConfiguration(useInMemoryDerby = true))
+        client.createDatabase(
+          CatalogDatabase("foo", "", URI.create(s"file://${tmpDir.getAbsolutePath}/foo.db"), Map()),
+          ignoreIfExists = true)
+      } finally {
+        Thread.currentThread().setContextClassLoader(contextClassLoader)
+      }
+    }
+  }
+
   test("SPARK-27349: Dealing with TimeVars removed in Hive 2.x") {
     // Test default value
     val defaultConf = new Configuration


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org