You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/11 05:33:51 UTC
spark git commit: [SPARK-19157][SQL] should be able to change
spark.sql.runSQLOnFiles at runtime
Repository: spark
Updated Branches:
refs/heads/master bc6c56e94 -> 3b19c74e7
[SPARK-19157][SQL] should be able to change spark.sql.runSQLOnFiles at runtime
## What changes were proposed in this pull request?
The analyzer rule that supports to query files directly will be added to `Analyzer.extendedResolutionRules` when SparkSession is created, according to the `spark.sql.runSQLOnFiles` flag. If the flag is off when we create `SparkSession`, this rule is not added and we can not query files directly even we turn on the flag later.
This PR fixes this bug by always adding that rule to `Analyzer.extendedResolutionRules`.
## How was this patch tested?
new regression test
Author: Wenchen Fan <we...@databricks.com>
Closes #16531 from cloud-fan/sql-on-files.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3b19c74e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3b19c74e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3b19c74e
Branch: refs/heads/master
Commit: 3b19c74e71fd6af18047747843e962b5401db4d9
Parents: bc6c56e
Author: Wenchen Fan <we...@databricks.com>
Authored: Tue Jan 10 21:33:44 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Jan 10 21:33:44 2017 -0800
----------------------------------------------------------------------
.../spark/sql/execution/datasources/rules.scala | 21 ++++++++++--------
.../spark/sql/internal/SessionState.scala | 2 +-
.../org/apache/spark/sql/SQLQuerySuite.scala | 23 ++++++++++++++++++++
.../spark/sql/hive/HiveSessionState.scala | 2 +-
4 files changed, 37 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 94ba814..5ca8226 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -35,27 +35,30 @@ import org.apache.spark.sql.types.{AtomicType, StructType}
* Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
*/
class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+ private def maybeSQLFile(u: UnresolvedRelation): Boolean = {
+ sparkSession.sessionState.conf.runSQLonFile && u.tableIdentifier.database.isDefined
+ }
+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
+ case u: UnresolvedRelation if maybeSQLFile(u) =>
try {
val dataSource = DataSource(
sparkSession,
paths = u.tableIdentifier.table :: Nil,
className = u.tableIdentifier.database.get)
- val notSupportDirectQuery = try {
- !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
- } catch {
- case NonFatal(e) => false
- }
- if (notSupportDirectQuery) {
+ // `dataSource.providingClass` may throw ClassNotFoundException, then the outer try-catch
+ // will catch it and return the original plan, so that the analyzer can report table not
+ // found later.
+ val isFileFormat = classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+ if (!isFileFormat) {
throw new AnalysisException("Unsupported data source type for direct query on files: " +
s"${u.tableIdentifier.database.get}")
}
val plan = LogicalRelation(dataSource.resolveRelation())
- u.alias.map(a => SubqueryAlias(u.alias.get, plan, None)).getOrElse(plan)
+ u.alias.map(a => SubqueryAlias(a, plan, None)).getOrElse(plan)
} catch {
- case e: ClassNotFoundException => u
+ case _: ClassNotFoundException => u
case e: Exception =>
// the provider is valid, but failed to create a logical plan
u.failAnalysis(e.getMessage)
http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 8759dfe..c9075ce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -117,7 +117,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
PreprocessTableInsertion(conf) ::
new FindDataSourceTable(sparkSession) ::
DataSourceAnalysis(conf) ::
- (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
+ new ResolveDataSource(sparkSession) :: Nil
override val extendedCheckRules =
Seq(PreWriteCheck(conf, catalog), HiveOnlyCheck)
http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e89599b..563d068 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2476,4 +2476,27 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
assert(sql("SELECT * FROM array_tbl where arr = ARRAY(1L)").count == 1)
}
}
+
+ test("SPARK-19157: should be able to change spark.sql.runSQLOnFiles at runtime") {
+ withTempPath { path =>
+ Seq(1 -> "a").toDF("i", "j").write.parquet(path.getCanonicalPath)
+
+ val newSession = spark.newSession()
+ val originalValue = newSession.sessionState.conf.runSQLonFile
+
+ try {
+ newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, false)
+ intercept[AnalysisException] {
+ newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`")
+ }
+
+ newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, true)
+ checkAnswer(
+ newSession.sql(s"SELECT i, j FROM parquet.`${path.getCanonicalPath}`"),
+ Row(1, "a"))
+ } finally {
+ newSession.sessionState.conf.setConf(SQLConf.RUN_SQL_ON_FILES, originalValue)
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3b19c74e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 52892f1..aebee85 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -65,7 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
- (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
+ new ResolveDataSource(sparkSession) :: Nil
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org