You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/02/23 03:13:39 UTC

spark git commit: [SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec

Repository: spark
Updated Branches:
  refs/heads/master a11b39951 -> 5d80fac58


[SPARK-11624][SPARK-11972][SQL] fix commands that need hive to exec

In SparkSQLCLI, we have created a `CliSessionState`, but then we call `SparkSQLEnv.init()`, which will start another `SessionState`. This would lead to exception because `processCmd` need to get the `CliSessionState` instance by calling `SessionState.get()`, but the return value would be a instance of `SessionState`. See the exception below.

spark-sql> !echo "test";
Exception in thread "main" java.lang.ClassCastException: org.apache.hadoop.hive.ql.session.SessionState cannot be cast to org.apache.hadoop.hive.cli.CliSessionState
	at org.apache.hadoop.hive.cli.CliDriver.processCmd(CliDriver.java:112)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:301)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:242)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:691)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Author: Daoyuan Wang <da...@intel.com>

Closes #9589 from adrian-wang/clicommand.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d80fac5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d80fac5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d80fac5

Branch: refs/heads/master
Commit: 5d80fac58f837933b5359a8057676f45539e53af
Parents: a11b399
Author: Daoyuan Wang <da...@intel.com>
Authored: Mon Feb 22 18:13:32 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Feb 22 18:13:32 2016 -0800

----------------------------------------------------------------------
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |  7 ++-
 .../spark/sql/hive/thriftserver/CliSuite.scala  |  5 ++
 sql/hive/pom.xml                                |  6 +++
 .../spark/sql/hive/client/HiveClientImpl.scala  | 54 ++++++++++++--------
 4 files changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5d80fac5/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index f279b78..fb31119 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -288,8 +288,11 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
     val tokens: Array[String] = cmd_trimmed.split("\\s+")
     val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
     if (cmd_lower.equals("quit") ||
-      cmd_lower.equals("exit") ||
-      tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
+      cmd_lower.equals("exit")) {
+      sessionState.close()
+      System.exit(0)
+    }
+    if (tokens(0).toLowerCase(Locale.ENGLISH).equals("source") ||
       cmd_trimmed.startsWith("!") ||
       tokens(0).toLowerCase.equals("list") ||
       isRemoteMode) {

http://git-wip-us.apache.org/repos/asf/spark/blob/5d80fac5/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 72da266..81508e1 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -234,4 +234,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging {
         -> "Error in query: Table not found: nonexistent_table;"
     )
   }
+
+  test("SPARK-11624 Spark SQL CLI should set sessionState only once") {
+    runCliWithin(2.minute, Seq("-e", "!echo \"This is a test for Spark-11624\";"))(
+      "" -> "This is a test for Spark-11624")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5d80fac5/sql/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 14cf9ac..22bad93 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -72,6 +72,12 @@
       <artifactId>protobuf-java</artifactId>
       <version>${protobuf.version}</version>
     </dependency>
+-->
+    <dependency>
+      <groupId>${hive.group}</groupId>
+      <artifactId>hive-cli</artifactId>
+    </dependency>
+<!--
     <dependency>
       <groupId>${hive.group}</groupId>
       <artifactId>hive-common</artifactId>

http://git-wip-us.apache.org/repos/asf/spark/blob/5d80fac5/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 7a007d2..5d62854 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.reflectiveCalls
 
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.cli.CliSessionState
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
 import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema, Function => HiveFunction, FunctionType, PrincipalType, ResourceUri}
@@ -31,7 +32,6 @@ import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Tab
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
 import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.spark.{Logging, SparkConf, SparkException}
@@ -105,29 +105,39 @@ private[hive] class HiveClientImpl(
     }
 
     val ret = try {
-      val initialConf = new HiveConf(classOf[SessionState])
-      // HiveConf is a Hadoop Configuration, which has a field of classLoader and
-      // the initial value will be the current thread's context class loader
-      // (i.e. initClassLoader at here).
-      // We call initialConf.setClassLoader(initClassLoader) at here to make
-      // this action explicit.
-      initialConf.setClassLoader(initClassLoader)
-      config.foreach { case (k, v) =>
-        if (k.toLowerCase.contains("password")) {
-          logDebug(s"Hive Config: $k=xxx")
-        } else {
-          logDebug(s"Hive Config: $k=$v")
+      // originState will be created if not exists, will never be null
+      val originalState = SessionState.get()
+      if (originalState.isInstanceOf[CliSessionState]) {
+        // In `SparkSQLCLIDriver`, we have already started a `CliSessionState`,
+        // which contains information like configurations from command line. Later
+        // we call `SparkSQLEnv.init()` there, which would run into this part again.
+        // so we should keep `conf` and reuse the existing instance of `CliSessionState`.
+        originalState
+      } else {
+        val initialConf = new HiveConf(classOf[SessionState])
+        // HiveConf is a Hadoop Configuration, which has a field of classLoader and
+        // the initial value will be the current thread's context class loader
+        // (i.e. initClassLoader at here).
+        // We call initialConf.setClassLoader(initClassLoader) at here to make
+        // this action explicit.
+        initialConf.setClassLoader(initClassLoader)
+        config.foreach { case (k, v) =>
+          if (k.toLowerCase.contains("password")) {
+            logDebug(s"Hive Config: $k=xxx")
+          } else {
+            logDebug(s"Hive Config: $k=$v")
+          }
+          initialConf.set(k, v)
         }
-        initialConf.set(k, v)
-      }
-      val state = new SessionState(initialConf)
-      if (clientLoader.cachedHive != null) {
-        Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+        val state = new SessionState(initialConf)
+        if (clientLoader.cachedHive != null) {
+          Hive.set(clientLoader.cachedHive.asInstanceOf[Hive])
+        }
+        SessionState.start(state)
+        state.out = new PrintStream(outputBuffer, true, "UTF-8")
+        state.err = new PrintStream(outputBuffer, true, "UTF-8")
+        state
       }
-      SessionState.start(state)
-      state.out = new PrintStream(outputBuffer, true, "UTF-8")
-      state.err = new PrintStream(outputBuffer, true, "UTF-8")
-      state
     } finally {
       Thread.currentThread().setContextClassLoader(original)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org