You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/11 05:33:51 UTC

spark git commit: [SPARK-19157][SQL] should be able to change spark.sql.runSQLOnFiles at runtime

Repository: spark
Updated Branches:
  refs/heads/master bc6c56e94 -> 3b19c74e7


[SPARK-19157][SQL] should be able to change spark.sql.runSQLOnFiles at runtime

## What changes were proposed in this pull request?

The analyzer rule that supports to query files directly will be added to `Analyzer.extendedResolutionRules` when SparkSession is created, according to the `spark.sql.runSQLOnFiles` flag. If the flag is off when we create `SparkSession`, this rule is not added and we can not query files directly even we turn on the flag later.

This PR fixes this bug by always adding that rule to `Analyzer.extendedResolutionRules`.

## How was this patch tested?

new regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #16531 from cloud-fan/sql-on-files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b19c74e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b19c74e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b19c74e

Branch: refs/heads/master
Commit: 3b19c74e71fd6af18047747843e962b5401db4d9
Parents: bc6c56e
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Jan 10 21:33:44 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 10 21:33:44 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/datasources/rules.scala | 21 ++++++++++--------
 .../spark/sql/internal/SessionState.scala       |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 23 ++++++++++++++++++++
 .../spark/sql/hive/HiveSessionState.scala       |  2 +-
 4 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 94ba814..5ca8226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -35,27 +35,30 @@ import org.apache.spark.sql.types.{AtomicType, StructType}
  * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
  */
 class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+  private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
+    sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
+    case u: UnresolvedRelation if maybeSQLFile(u) =>
       try {
         val dataSource = DataSource(
           sparkSession,
           paths = u.tableIdentifier.table :: Nil,
           className = u.tableIdentifier.database.get)
 
-        val notSupportDirectQuery = try {
-          !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
-        } catch {
-          case NonFatal(e) => false
-        }
-        if (notSupportDirectQuery) {
+        // `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch
+        // will catch it and return the original plan, so that the analyzer can report table not
+        // found later.
+        val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+        if (!isFileFormat) {
           throw new AnalysisException("Unsupported data source type for direct query on files: " +
             s"${u.tableIdentifier.database.get}")
         }
         val plan = LogicalRelation(dataSource.resolveRelation())
-        u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan)
+        u.alias.map(a => SubqueryAlias(a, plan, None)).getOrElse(plan)
       } catch {
-        case e: ClassNotFoundException => u
+        case _: ClassNotFoundException => u
         case e: Exception =>
           // the provider is valid, but failed to create a logical plan
           u.failAnalysis(e.getMessage)

http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8759dfe..c9075ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -117,7 +117,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
         PreprocessTableInsertion(conf) ::
         new FindDataSourceTable(sparkSession) ::
         DataSourceAnalysis(conf) ::
-        (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
+        new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules =
         Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)

http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e89599b..563d068 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2476,4 +2476,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
       assert(sql("SELECT * FROM array_tbl where arr = ARRAY(1L)").count == 1)
     }
   }
+
+  test("SPARK-19157: should be able to change spark.sql.runSQLOnFiles at runtime") {
+    withTempPath { path =>
+      Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
+
+      val newSession = spark.newSession()
+      val originalValue = newSession.sessionState.conf.runSQLonFile
+
+      try {
+        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
+        intercept[AnalysisException] {
+          newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`")
+        }
+
+        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
+        checkAnswer(
+          newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"),
+          Row(1, "a"))
+      } finally {
+        newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 52892f1..aebee85 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -65,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
-        (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
+        new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org