You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/27 23:31:09 UTC

spark git commit: [SPARK-7853] [SQL] Fixes a class loader issue in Spark SQL

Repository: spark
Updated Branches:
  refs/heads/master b97ddff00 -> db3fd054f


[SPARK-7853] [SQL] Fixes a class loader issue in Spark SQL

This PR is based on PR #6396 authored by chenghao-intel. Essentially, Spark SQL should use context classloader to load SerDe classes.

yhuai helped updating the test case, and I fixed a bug in the original `CliSuite`: while testing the CLI tool with `runCliWithin`, we don't append `\n` to the last query, thus the last query is never executed.

Original PR description is pasted below.

----

```
bin/spark-sql --jars ./sql/hive/src/test/resources/hive-hcatalog-core-0.13.1.jar
CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
```

Throws exception like

```
15/05/26 00:16:33 ERROR SparkSQLDriver: Failed in [CREATE TABLE t1(a string, b string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe']
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Cannot validate serde: org.apache.hive.hcatalog.data.JsonSerDe
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:333)
        at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$runHive$1.apply(ClientWrapper.scala:310)
        at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:139)
        at org.apache.spark.sql.hive.client.ClientWrapper.runHive(ClientWrapper.scala:310)
        at org.apache.spark.sql.hive.client.ClientWrapper.runSqlHive(ClientWrapper.scala:300)
        at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:457)
        at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:68)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:88)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:148)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:87)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:922)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:922)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:147)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:131)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:727)
        at org.apache.spark.sql.hive.thriftserver.AbstractSparkSQLDriver.run(AbstractSparkSQLDriver.scala:57)
```

Author: Cheng Hao <ha...@intel.com>
Author: Cheng Lian <li...@databricks.com>
Author: Yin Huai <yh...@databricks.com>

Closes #6435 from liancheng/classLoader and squashes the following commits:

d4c4845 [Cheng Lian] Fixes CliSuite
75e80e2 [Yin Huai] Update the fix.
fd26533 [Cheng Hao] scalastyle
dd78775 [Cheng Hao] workaround for classloader of IsolatedClientLoader


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db3fd054
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db3fd054
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db3fd054

Branch: refs/heads/master
Commit: db3fd054f240c7e38aba0732e471df65cd14011a
Parents: b97ddff
Author: Cheng Hao <ha...@intel.com>
Authored: Wed May 27 14:21:00 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed May 27 14:21:00 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/thriftserver/CliSuite.scala  | 41 ++++++++++++++++++--
 .../org/apache/spark/sql/hive/HiveContext.scala | 18 +++++++--
 .../org/apache/spark/sql/hive/TableReader.scala |  2 +-
 3 files changed, 53 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/db3fd054/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index b070fa8..cc07db8 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -25,11 +25,15 @@ import scala.concurrent.{Await, Promise}
 import scala.sys.process.{Process, ProcessLogger}
 
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
+import org.scalatest.{BeforeAndAfter, FunSuite}
 
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 
+/**
+ * A test suite for the `spark-sql` CLI tool.  Note that all test cases share the same temporary
+ * Hive metastore and warehouse.
+ */
 class CliSuite extends FunSuite with BeforeAndAfter with Logging {
   val warehousePath = Utils.createTempDir()
   val metastorePath = Utils.createTempDir()
@@ -58,13 +62,13 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
          |  --master local
          |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl
          |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
-         |  --driver-class-path ${sys.props("java.class.path")}
        """.stripMargin.split("\\s+").toSeq ++ extraArgs
     }
 
     var next = 0
     val foundAllExpectedAnswers = Promise.apply[Unit]()
-    val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
+    // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
+    val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
     val buffer = new ArrayBuffer[String]()
     val lock = new Object
 
@@ -124,7 +128,7 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
       "SELECT COUNT(*) FROM hive_test;"
         -> "5",
       "DROP TABLE hive_test;"
-        -> "Time taken: "
+        -> "OK"
     )
   }
 
@@ -151,4 +155,33 @@ class CliSuite extends FunSuite with BeforeAndAfter with Logging {
         -> "hive_test"
     )
   }
+
+  test("Commands using SerDe provided in --jars") {
+    val jarFile =
+      "../hive/src/test/resources/hive-hcatalog-core-0.13.1.jar"
+        .split("/")
+        .mkString(File.separator)
+
+    val dataFilePath =
+      Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
+
+    runCliWithin(1.minute, Seq("--jars", s"$jarFile"))(
+      """CREATE TABLE t1(key string, val string)
+        |ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
+      """.stripMargin
+        -> "OK",
+      "CREATE TABLE sourceTable (key INT, val STRING);"
+        -> "OK",
+      s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE sourceTable;"
+        -> "OK",
+      "INSERT INTO TABLE t1 SELECT key, val FROM sourceTable;"
+        -> "Time taken:",
+      "SELECT count(key) FROM t1;"
+        -> "5",
+      "DROP TABLE t1;"
+        -> "OK",
+      "DROP TABLE sourceTable;"
+        -> "OK"
+    )
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/db3fd054/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index b64768a..9ab98fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
+import java.net.{URL, URLClassLoader}
 import java.sql.Timestamp
 import java.util.{ArrayList => JArrayList}
 
@@ -25,7 +26,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution
 import org.apache.spark.sql.catalyst.ParserDialect
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -188,8 +189,19 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
           "Specify a vaild path to the correct hive jars using $HIVE_METASTORE_JARS " +
           s"or change $HIVE_METASTORE_VERSION to $hiveExecutionVersion.")
       }
-      val jars = getClass.getClassLoader match {
-        case urlClassLoader: java.net.URLClassLoader => urlClassLoader.getURLs
+      // We recursively add all jars in the class loader chain,
+      // starting from the given urlClassLoader.
+      def addJars(urlClassLoader: URLClassLoader): Array[URL] = {
+        val jarsInParent = urlClassLoader.getParent match {
+          case parent: URLClassLoader => addJars(parent)
+          case other => Array.empty[URL]
+        }
+
+        urlClassLoader.getURLs ++ jarsInParent
+      }
+
+      val jars = Utils.getContextOrSparkClassLoader match {
+        case urlClassLoader: URLClassLoader => addJars(urlClassLoader)
         case other =>
           throw new IllegalArgumentException(
             "Unable to locate hive jars to connect to metastore " +

http://git-wip-us.apache.org/repos/asf/spark/blob/db3fd054/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 0b6f7a3..294fc3b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -79,7 +79,7 @@ class HadoopTableReader(
     makeRDDForTable(
       hiveTable,
       Class.forName(
-        relation.tableDesc.getSerdeClassName, true, Utils.getSparkClassLoader)
+        relation.tableDesc.getSerdeClassName, true, Utils.getContextOrSparkClassLoader)
         .asInstanceOf[Class[Deserializer]],
       filterOpt = None)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org