You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/10/21 22:38:35 UTC

spark git commit: [SPARK-11197][SQL] run SQL on files directly

Repository: spark
Updated Branches:
  refs/heads/master 7c74ebca0 -> f8c6bec65


[SPARK-11197][SQL] run SQL on files directly

This PR introduce a new feature to run SQL directly on files without create a table, for example:

```
select id from json.`path/to/json/files` as j
```

Author: Davies Liu <da...@databricks.com>

Closes #9173 from davies/source.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8c6bec6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8c6bec6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8c6bec6

Branch: refs/heads/master
Commit: f8c6bec65784de89b47e96a367d3f9790c1b3115
Parents: 7c74ebc
Author: Davies Liu <da...@databricks.com>
Authored: Wed Oct 21 13:38:30 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Oct 21 13:38:30 2015 -0700

----------------------------------------------------------------------
 R/pkg/inst/tests/test_sparkSQL.R                |  2 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 10 +++++--
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  3 +++
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  2 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  8 ++++++
 .../scala/org/apache/spark/sql/SQLContext.scala |  2 +-
 .../spark/sql/execution/datasources/rules.scala | 28 ++++++++++++++++++--
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 28 ++++++++++++++++++++
 .../org/apache/spark/sql/hive/HiveContext.scala |  4 +--
 .../sql/hive/execution/SQLQuerySuite.scala      | 13 +++++++++
 10 files changed, 91 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/R/pkg/inst/tests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/test_sparkSQL.R b/R/pkg/inst/tests/test_sparkSQL.R
index e1b42b0..67d8b23 100644
--- a/R/pkg/inst/tests/test_sparkSQL.R
+++ b/R/pkg/inst/tests/test_sparkSQL.R
@@ -1428,7 +1428,7 @@ test_that("sampleBy() on a DataFrame", {
 
 test_that("SQL error message is returned from JVM", {
   retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e)
-  expect_equal(grepl("Table Not Found: blah", retError), TRUE)
+  expect_equal(grepl("Table not found: blah", retError), TRUE)
 })
 
 test_that("Method as.data.frame as a synonym for collect()", {

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 016dc29..beabacf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -256,7 +256,7 @@ class Analyzer(
         catalog.lookupRelation(u.tableIdentifier, u.alias)
       } catch {
         case _: NoSuchTableException =>
-          u.failAnalysis(s"Table Not Found: ${u.tableName}")
+          u.failAnalysis(s"Table not found: ${u.tableName}")
       }
     }
 
@@ -264,7 +264,13 @@ class Analyzer(
       case i @ InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
         i.copy(table = EliminateSubQueries(getTable(u)))
       case u: UnresolvedRelation =>
-        getTable(u)
+        try {
+          getTable(u)
+        } catch {
+          case _: AnalysisException if u.tableIdentifier.database.isDefined =>
+            // delay the exception into CheckAnalysis, then it could be resolved as data source.
+            u
+        }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7701fd0..ab21540 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -49,6 +49,9 @@ trait CheckAnalysis {
     plan.foreachUp {
       case p if p.analyzed => // Skip already analyzed sub-plans
 
+      case u: UnresolvedRelation =>
+        u.failAnalysis(s"Table not found: ${u.tableIdentifier}")
+
       case operator: LogicalPlan =>
         operator transformExpressionsUp {
           case a: Attribute if !a.resolved =>

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 24af848..0a1fa74 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -78,7 +78,7 @@ class AnalysisSuite extends AnalysisTest {
 
   test("resolve relations") {
     assertAnalysisError(
-      UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table Not Found: tAbLe"))
+      UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe"))
 
     checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index b08cc8e..6f28920 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -432,6 +432,12 @@ private[spark] object SQLConf {
   val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2",
     defaultValue = Some(true), doc = "<TODO>")
 
+  val RUN_SQL_ON_FILES = booleanConf("spark.sql.runSQLOnFiles",
+    defaultValue = Some(true),
+    isPublic = false,
+    doc = "When true, we could use `datasource`.`path` as table in SQL query"
+  )
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
     val EXTERNAL_SORT = "spark.sql.planner.externalSort"
@@ -540,6 +546,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
 
   private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS)
 
+  private[spark] def runSQLOnFile: Boolean = getConf(RUN_SQL_ON_FILES)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e83657a..a107639 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -193,7 +193,7 @@ class SQLContext private[sql](
       override val extendedResolutionRules =
         ExtractPythonUDFs ::
         PreInsertCastAndRename ::
-        Nil
+        (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(
         datasources.PreWriteCheck(catalog)

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b00e568..abc016b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,13 +17,37 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.spark.sql.{AnalysisException, SaveMode}
-import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateSubQueries}
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation}
+import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
+
+/**
+ * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
+ */
+private[sql] class ResolveDataSource(sqlContext: SQLContext) extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case u: UnresolvedRelation if u.tableIdentifier.database.isDefined =>
+      try {
+        val resolved = ResolvedDataSource(
+          sqlContext,
+          userSpecifiedSchema = None,
+          partitionColumns = Array(),
+          provider = u.tableIdentifier.database.get,
+          options = Map("path" -> u.tableIdentifier.table))
+        val plan = LogicalRelation(resolved.relation)
+        u.alias.map(a => Subquery(u.alias.get, plan)).getOrElse(plan)
+      } catch {
+        case e: ClassNotFoundException => u
+        case e: Exception =>
+          // the provider is valid, but failed to create a logical plan
+          u.failAnalysis(e.getMessage)
+      }
+  }
+}
 
 /**
  * A rule to do pre-insert data type casting and field renaming. Before we insert into

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a35a7f4..298c322 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1796,6 +1796,34 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
     }
   }
 
+  test("run sql directly on files") {
+    val df = sqlContext.range(100)
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      checkAnswer(sql(s"select id from json.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
+        df)
+    })
+
+    val e1 = intercept[AnalysisException] {
+      sql("select * from in_valid_table")
+    }
+    assert(e1.message.contains("Table not found"))
+
+    val e2 = intercept[AnalysisException] {
+      sql("select * from no_db.no_table")
+    }
+    assert(e2.message.contains("Table not found"))
+
+    val e3 = intercept[AnalysisException] {
+      sql("select * from json.invalid_file")
+    }
+    assert(e3.message.contains("No input paths specified"))
+  }
+
   test("SortMergeJoin returns wrong results when using UnsafeRows") {
     // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737.
     // This bug will be triggered when Tungsten is enabled and there are multiple

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 38c195b..61f6116 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
-import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
+import org.apache.spark.sql.execution.datasources.{ResolveDataSource, DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
 import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
 import org.apache.spark.sql.hive.client._
@@ -473,7 +473,7 @@ class HiveContext private[hive](
         ExtractPythonUDFs ::
         ResolveHiveWindowFunction ::
         PreInsertCastAndRename ::
-        Nil
+        (if (conf.runSQLOnFile) new ResolveDataSource(self) :: Nil else Nil)
 
       override val extendedCheckRules = Seq(
         PreWriteCheck(catalog)

http://git-wip-us.apache.org/repos/asf/spark/blob/f8c6bec6/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index c929ba5..396150b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1281,6 +1281,19 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
+  test("run sql directly on files") {
+    val df = sqlContext.range(100)
+    withTempPath(f => {
+      df.write.parquet(f.getCanonicalPath)
+      checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select a.id from parquet.`${f.getCanonicalPath}` as a"),
+        df)
+    })
+  }
+
   test("correctly parse CREATE VIEW statement") {
     withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
       withTable("jt") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org