You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/05/01 03:49:09 UTC

spark git commit: [SPARK-5213] [SQL] Pluggable SQL Parser Support

Repository: spark
Updated Branches:
  refs/heads/master e991255e7 -> 3ba5aaab8


[SPARK-5213] [SQL] Pluggable SQL Parser Support

This PR aims to make the SQL Parser Pluggable, and user can register it's own parser via Spark SQL CLI.

```
# add the jar into the classpath
$hchengmydesktop:spark>bin/spark-sql --jars sql99.jar

-- switch to "hiveql" dialect
   spark-sql>SET spark.sql.dialect=hiveql;
   spark-sql>SELECT * FROM src LIMIT 1;

-- switch to "sql" dialect
   spark-sql>SET spark.sql.dialect=sql;
   spark-sql>SELECT * FROM src LIMIT 1;

-- switch to a custom dialect
   spark-sql>SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
   spark-sql>SELECT * FROM src LIMIT 1;

-- register the non-exist SQL dialect
   spark-sql> SET spark.sql.dialect=NotExistedClass;
   spark-sql> SELECT * FROM src LIMIT 1;
-- Exception will be thrown and switch to default sql dialect ("sql" for SQLContext and "hiveql" for HiveContext)
```

Author: Cheng Hao <ha...@intel.com>

Closes #4015 from chenghao-intel/sqlparser and squashes the following commits:

493775c [Cheng Hao] update the code as feedback
81a731f [Cheng Hao] remove the unecessary comment
aab0b0b [Cheng Hao] polish the code a little bit
49b9d81 [Cheng Hao] shrink the comment for rebasing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ba5aaab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ba5aaab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ba5aaab

Branch: refs/heads/master
Commit: 3ba5aaab8266822545ac82b9e733fd25cc215a77
Parents: e991255
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Apr 30 18:49:06 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Apr 30 18:49:06 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/AbstractSparkSQLParser.scala   | 11 ++-
 .../org/apache/spark/sql/catalyst/Dialect.scala | 33 ++++++++
 .../spark/sql/catalyst/errors/package.scala     |  2 +
 .../scala/org/apache/spark/sql/SQLContext.scala | 82 ++++++++++++++++----
 .../org/apache/spark/sql/sources/ddl.scala      |  6 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 22 ++++++
 .../org/apache/spark/sql/hive/HiveContext.scala | 41 ++++++----
 .../apache/spark/sql/hive/test/TestHive.scala   |  5 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 39 +++++++++-
 9 files changed, 199 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
index 1f3c024..2eb3e16 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/AbstractSparkSQLParser.scala
@@ -25,10 +25,6 @@ import scala.util.parsing.input.CharArrayReader.EofCh
 
 import org.apache.spark.sql.catalyst.plans.logical._
 
-private[sql] object KeywordNormalizer {
-  def apply(str: String): String = str.toLowerCase()
-}
-
 private[sql] abstract class AbstractSparkSQLParser
   extends StandardTokenParsers with PackratParsers {
 
@@ -42,7 +38,7 @@ private[sql] abstract class AbstractSparkSQLParser
   }
 
   protected case class Keyword(str: String) {
-    def normalize: String = KeywordNormalizer(str)
+    def normalize: String = lexical.normalizeKeyword(str)
     def parser: Parser[String] = normalize
   }
 
@@ -90,13 +86,16 @@ class SqlLexical extends StdLexical {
     reserved ++= keywords
   }
 
+  /* Normal the keyword string */
+  def normalizeKeyword(str: String): String = str.toLowerCase
+
   delimiters += (
     "@", "*", "+", "-", "<", "=", "<>", "!=", "<=", ">=", ">", "/", "(", ")",
     ",", ";", "%", "{", "}", ":", "[", "]", ".", "&", "|", "^", "~", "<=>"
   )
 
   protected override def processIdent(name: String) = {
-    val token = KeywordNormalizer(name)
+    val token = normalizeKeyword(name)
     if (reserved contains token) Keyword(token) else Identifier(name)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala
new file mode 100644
index 0000000..9770034
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/Dialect.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+/**
+ * Root class of SQL Parser Dialect, and we don't guarantee the binary
+ * compatibility for the future release, let's keep it as the internal
+ * interface for advanced user.
+ *
+ */
+@DeveloperApi
+abstract class Dialect {
+  // this is the main function that will be implemented by sql parser.
+  def parse(sqlText: String): LogicalPlan
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
index bdeb660..0fd4f9b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/errors/package.scala
@@ -38,6 +38,8 @@ package object errors {
     }
   }
 
+  class DialectException(msg: String, cause: Throwable) extends Exception(msg, cause)
+
   /**
    *  Wraps any exceptions that are thrown while executing `f` in a
    *  [[catalyst.errors.TreeNodeException TreeNodeException]], attaching the provided `tree`.

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index bd4a55f..77f51df 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConversions._
 import scala.collection.immutable
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
+import scala.util.control.NonFatal
 
 import com.google.common.reflect.TypeToken
 
@@ -32,9 +33,11 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.catalyst.Dialect
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions}
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
@@ -45,6 +48,45 @@ import org.apache.spark.util.Utils
 import org.apache.spark.{Partition, SparkContext}
 
 /**
+ * Currently we support the default dialect named "sql", associated with the class
+ * [[DefaultDialect]]
+ *
+ * And we can also provide custom SQL Dialect, for example in Spark SQL CLI:
+ * {{{
+ *-- switch to "hiveql" dialect
+ *   spark-sql>SET spark.sql.dialect=hiveql;
+ *   spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- switch to "sql" dialect
+ *   spark-sql>SET spark.sql.dialect=sql;
+ *   spark-sql>SELECT * FROM src LIMIT 1;
+ *
+ *-- register the new SQL dialect
+ *   spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect;
+ *   spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- register the non-exist SQL dialect
+ *   spark-sql> SET spark.sql.dialect=NotExistedClass;
+ *   spark-sql> SELECT * FROM src LIMIT 1;
+ *
+ *-- Exception will be thrown and switch to dialect
+ *-- "sql" (for SQLContext) or 
+ *-- "hiveql" (for HiveContext)
+ * }}}
+ */
+private[spark] class DefaultDialect extends Dialect {
+  @transient
+  protected val sqlParser = {
+    val catalystSqlParser = new catalyst.SqlParser
+    new SparkSQLParser(catalystSqlParser.parse)
+  }
+
+  override def parse(sqlText: String): LogicalPlan = {
+    sqlParser.parse(sqlText)
+  }
+}
+
+/**
  * The entry point for working with structured data (rows and columns) in Spark.  Allows the
  * creation of [[DataFrame]] objects as well as the execution of SQL queries.
  *
@@ -132,17 +174,27 @@ class SQLContext(@transient val sparkContext: SparkContext)
   protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
 
   @transient
-  protected[sql] val ddlParser = new DDLParser(sqlParser.parse(_))
-
-  @transient
-  protected[sql] val sqlParser = {
-    val fallback = new catalyst.SqlParser
-    new SparkSQLParser(fallback.parse(_))
+  protected[sql] val ddlParser = new DDLParser((sql: String) => { getSQLDialect().parse(sql) })
+
+  protected[sql] def getSQLDialect(): Dialect = {
+    try {
+      val clazz = Utils.classForName(dialectClassName)
+      clazz.newInstance().asInstanceOf[Dialect]
+    } catch {
+      case NonFatal(e) =>
+        // Since we didn't find the available SQL Dialect, it will fail even for SET command:
+        // SET spark.sql.dialect=sql; Let's reset as default dialect automatically.
+        val dialect = conf.dialect
+        // reset the sql dialect
+        conf.unsetConf(SQLConf.DIALECT)
+        // throw out the exception, and the default sql dialect will take effect for next query.
+        throw new DialectException(
+          s"""Instantiating dialect '$dialect' failed.
+             |Reverting to default dialect '${conf.dialect}'""".stripMargin, e)
+    }
   }
 
-  protected[sql] def parseSql(sql: String): LogicalPlan = {
-    ddlParser.parse(sql, false).getOrElse(sqlParser.parse(sql))
-  }
+  protected[sql] def parseSql(sql: String): LogicalPlan = ddlParser.parse(sql, false)
 
   protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql))
 
@@ -156,6 +208,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
   @transient
   protected[sql] val defaultSession = createSession()
 
+  protected[sql] def dialectClassName = if (conf.dialect == "sql") {
+    classOf[DefaultDialect].getCanonicalName
+  } else {
+    conf.dialect
+  }
+
   sparkContext.getConf.getAll.foreach {
     case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
     case _ =>
@@ -945,11 +1003,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group basic
    */
   def sql(sqlText: String): DataFrame = {
-    if (conf.dialect == "sql") {
-      DataFrame(this, parseSql(sqlText))
-    } else {
-      sys.error(s"Unsupported SQL dialect: ${conf.dialect}")
-    }
+    DataFrame(this, parseSql(sqlText))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index e7a0685..1abf3aa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -38,12 +38,12 @@ private[sql] class DDLParser(
     parseQuery: String => LogicalPlan)
   extends AbstractSparkSQLParser with DataTypeParser with Logging {
 
-  def parse(input: String, exceptionOnError: Boolean): Option[LogicalPlan] = {
+  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
     try {
-      Some(parse(input))
+      parse(input)
     } catch {
       case ddlException: DDLException => throw ddlException
-      case _ if !exceptionOnError => None
+      case _ if !exceptionOnError => parseQuery(input)
       case x: Throwable => throw x
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9e02e69..255f8c3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -19,13 +19,18 @@ package org.apache.spark.sql
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql.catalyst.errors.DialectException
 import org.apache.spark.sql.execution.GeneratedAggregate
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.test.TestSQLContext
 import org.apache.spark.sql.test.TestSQLContext.{udf => _, _}
+
 import org.apache.spark.sql.types._
 
+/** A SQL Dialect for testing purpose, and it can not be nested type */
+class MyDialect extends DefaultDialect
+
 class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   // Make sure the tables are loaded.
   TestData
@@ -64,6 +69,23 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
       Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil)
   }
 
+  test("SQL Dialect Switching to a new SQL parser") {
+    val newContext = new SQLContext(TestSQLContext.sparkContext)
+    newContext.setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
+    assert(newContext.getSQLDialect().getClass === classOf[MyDialect])
+    assert(newContext.sql("SELECT 1").collect() === Array(Row(1)))
+  }
+
+  test("SQL Dialect Switch to an invalid parser with alias") {
+    val newContext = new SQLContext(TestSQLContext.sparkContext)
+    newContext.sql("SET spark.sql.dialect=MyTestClass")
+    intercept[DialectException] {
+      newContext.sql("SELECT 1")
+    }
+    // test if the dialect set back to DefaultSQLDialect
+    assert(newContext.getSQLDialect().getClass === classOf[DefaultDialect])
+  }
+
   test("SPARK-4625 support SORT BY in SimpleSQLParser & DSL") {
     checkAnswer(
       sql("SELECT a FROM testData2 SORT BY a"),

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index dd06b26..1d8d0b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.hive
 import java.io.{BufferedReader, InputStreamReader, PrintStream}
 import java.sql.Timestamp
 
+import org.apache.hadoop.hive.ql.parse.VariableSubstitution
+import org.apache.spark.sql.catalyst.Dialect
+
 import scala.collection.JavaConversions._
 import scala.language.implicitConversions
 
@@ -43,6 +46,15 @@ import org.apache.spark.sql.sources.{DDLParser, DataSourceStrategy}
 import org.apache.spark.sql.types._
 
 /**
+ * This is the HiveQL Dialect, this dialect is strongly bind with HiveContext
+ */
+private[hive] class HiveQLDialect extends Dialect {
+  override def parse(sqlText: String): LogicalPlan = {
+    HiveQl.parseSql(sqlText)
+  }
+}
+
+/**
  * An instance of the Spark SQL execution engine that integrates with data stored in Hive.
  * Configuration for Hive is read from hive-site.xml on the classpath.
  */
@@ -81,25 +93,16 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   protected[sql] def convertCTAS: Boolean =
     getConf("spark.sql.hive.convertCTAS", "false").toBoolean
 
-  override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
-    new this.QueryExecution(plan)
-
   @transient
-  protected[sql] val ddlParserWithHiveQL = new DDLParser(HiveQl.parseSql(_))
-
-  override def sql(sqlText: String): DataFrame = {
-    val substituted = new VariableSubstitution().substitute(hiveconf, sqlText)
-    // TODO: Create a framework for registering parsers instead of just hardcoding if statements.
-    if (conf.dialect == "sql") {
-      super.sql(substituted)
-    } else if (conf.dialect == "hiveql") {
-      val ddlPlan = ddlParserWithHiveQL.parse(sqlText, exceptionOnError = false)
-      DataFrame(this, ddlPlan.getOrElse(HiveQl.parseSql(substituted)))
-    }  else {
-      sys.error(s"Unsupported SQL dialect: ${conf.dialect}. Try 'sql' or 'hiveql'")
-    }
+  protected[sql] lazy val substitutor = new VariableSubstitution()
+
+  protected[sql] override def parseSql(sql: String): LogicalPlan = {
+    super.parseSql(substitutor.substitute(hiveconf, sql))
   }
 
+  override protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
+    new this.QueryExecution(plan)
+
   /**
    * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
    * Spark SQL or the external data source library it uses might cache certain metadata about a
@@ -356,6 +359,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     }
   }
 
+  override protected[sql] def dialectClassName = if (conf.dialect == "hiveql") {
+    classOf[HiveQLDialect].getCanonicalName
+  } else {
+    super.dialectClassName
+  }
+
   @transient
   private val hivePlanner = new SparkPlanner with HiveStrategies {
     val hiveContext = self

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 9f17bca..edeab51 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -107,7 +107,10 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
     /** Fewer partitions to speed up testing. */
     protected[sql] override lazy val conf: SQLConf = new SQLConf {
       override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
-      override def dialect: String = getConf(SQLConf.DIALECT, "hiveql")
+
+      // TODO as in unit test, conf.clear() probably be called, all of the value will be cleared.
+      // The super.getConf(SQLConf.DIALECT) is "sql" by default, we need to set it as "hiveql"
+      override def dialect: String = super.getConf(SQLConf.DIALECT, "hiveql")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3ba5aaab/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 4f8d0ac..630dec8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,14 +18,17 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
-import org.apache.spark.sql.hive.{MetastoreRelation, HiveShim}
+import org.apache.spark.sql.catalyst.errors.DialectException
+import org.apache.spark.sql.DefaultDialect
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql.hive.MetastoreRelation
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim}
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
 
 case class Nested1(f1: Nested2)
 case class Nested2(f2: Nested3)
@@ -45,6 +48,9 @@ case class Order(
     state: String,
     month: Int)
 
+/** A SQL Dialect for testing purpose, and it can not be nested type */
+class MyDialect extends DefaultDialect
+
 /**
  * A collection of hive query tests where we generate the answers ourselves instead of depending on
  * Hive to generate them (in contrast to HiveQuerySuite).  Often this is because the query is
@@ -229,6 +235,35 @@ class SQLQuerySuite extends QueryTest {
     setConf("spark.sql.hive.convertCTAS", originalConf)
   }
 
+  test("SQL Dialect Switching") {
+    assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+    setConf("spark.sql.dialect", classOf[MyDialect].getCanonicalName())
+    assert(getSQLDialect().getClass === classOf[MyDialect])
+    assert(sql("SELECT 1").collect() === Array(Row(1)))
+
+    // set the dialect back to the DefaultSQLDialect
+    sql("SET spark.sql.dialect=sql")
+    assert(getSQLDialect().getClass === classOf[DefaultDialect])
+    sql("SET spark.sql.dialect=hiveql")
+    assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+
+    // set invalid dialect
+    sql("SET spark.sql.dialect.abc=MyTestClass")
+    sql("SET spark.sql.dialect=abc")
+    intercept[Exception] {
+      sql("SELECT 1")
+    }
+    // test if the dialect set back to HiveQLDialect
+    getSQLDialect().getClass === classOf[HiveQLDialect]
+
+    sql("SET spark.sql.dialect=MyTestClass")
+    intercept[DialectException] {
+      sql("SELECT 1")
+    }
+    // test if the dialect set back to HiveQLDialect
+    assert(getSQLDialect().getClass === classOf[HiveQLDialect])
+  }
+
   test("CTAS with serde") {
     sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect()
     sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org