You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/06/02 20:22:45 UTC

spark git commit: [SPARK-15515][SQL] Error Handling in Running SQL Directly On Files

Repository: spark
Updated Branches:
  refs/heads/master 8900c8d8f -> 9aff6f3b1


[SPARK-15515][SQL] Error Handling in Running SQL Directly On Files

#### What changes were proposed in this pull request?
This PR is to address the following issues:

- **ISSUE 1:** For ORC source format, we are reporting the strange error message when we did not enable Hive support:
```SQL
SQL Example:
  select id from `org.apache.spark.sql.hive.orc`.`file_path`
Error Message:
  Table or view not found: `org.apache.spark.sql.hive.orc`.`file_path`
```
Instead, we should issue the error message like:
```
Expected Error Message:
   The ORC data source must be used with Hive support enabled
```
- **ISSUE 2:** For the Avro format, we report the strange error message like:

The example query is like
  ```SQL
SQL Example:
  select id from `avro`.`file_path`
  select id from `com.databricks.spark.avro`.`file_path`
Error Message:
  Table or view not found: `com.databricks.spark.avro`.`file_path`
   ```
The desired message should be like:
```
Expected Error Message:
  Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro"
```

- ~~**ISSUE 3:** Unable to detect incompatibility libraries for Spark 2.0 in Data Source Resolution. We report a strange error message:~~

**Update**: The latest code changes contains
- For JDBC format, we added an extra checking in the rule `ResolveRelations` of `Analyzer`. Without the PR, Spark will return the error message like: `Option 'url' not specified`. Now, we are reporting `Unsupported data source type for direct query on files: jdbc`
- Make data source format name case incensitive so that error handling behaves consistent with the normal cases.
- Added the test cases for all the supported formats.

#### How was this patch tested?
Added test cases to cover all the above issues

Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>

Closes #13283 from gatorsmile/runSQLAgainstFile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9aff6f3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9aff6f3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9aff6f3b

Branch: refs/heads/master
Commit: 9aff6f3b1915523432b1921fdd30fa015ed5d670
Parents: 8900c8d
Author: gatorsmile <ga...@gmail.com>
Authored: Thu Jun 2 13:22:43 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Jun 2 13:22:43 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 34 +++++--------
 .../spark/sql/execution/datasources/rules.scala | 14 +++++-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 53 +++++++++++++++++---
 .../spark/sql/sources/DDLSourceLoadSuite.scala  |  5 +-
 .../sql/sources/ResolvedDataSourceSuite.scala   | 14 +++++-
 .../sql/hive/execution/SQLQuerySuite.scala      | 48 +++++++++++++++++-
 6 files changed, 134 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 93f1ad0..5f17fdf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -132,28 +132,20 @@ case class DataSource(
               // Found the data source using fully qualified path
               dataSource
             case Failure(error) =>
-              if (error.isInstanceOf[ClassNotFoundException]) {
-                val className = error.getMessage
-                if (spark2RemovedClasses.contains(className)) {
-                  throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " +
-                    "Please check if your library is compatible with Spark 2.0")
-                }
-              }
-              if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
-                throw new ClassNotFoundException(
-                  "The ORC data source must be used with Hive support enabled.", error)
+              if (provider.toLowerCase == "orc" ||
+                  provider.startsWith("org.apache.spark.sql.hive.orc")) {
+                throw new AnalysisException(
+                  "The ORC data source must be used with Hive support enabled")
+              } else if (provider.toLowerCase == "avro" ||
+                  provider == "com.databricks.spark.avro") {
+                throw new AnalysisException(
+                  s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " +
+                    "package http://spark-packages.org/package/databricks/spark-avro")
               } else {
-                if (provider == "avro" || provider == "com.databricks.spark.avro") {
-                  throw new ClassNotFoundException(
-                    s"Failed to find data source: $provider. Please use Spark package " +
-                      "http://spark-packages.org/package/databricks/spark-avro",
-                    error)
-                } else {
-                  throw new ClassNotFoundException(
-                    s"Failed to find data source: $provider. Please find packages at " +
-                      "http://spark-packages.org",
-                    error)
-                }
+                throw new ClassNotFoundException(
+                  s"Failed to find data source: $provider. Please find packages at " +
+                    "http://spark-packages.org",
+                  error)
               }
           }
         } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b622f85..9afd715 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
 
 /**
- * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
+ * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
  */
 private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
@@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
           sparkSession,
           paths = u.tableIdentifier.table :: Nil,
           className = u.tableIdentifier.database.get)
+
+        val notSupportDirectQuery = try {
+          !classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
+        } catch {
+          case NonFatal(e) => false
+        }
+        if (notSupportDirectQuery) {
+          throw new AnalysisException("Unsupported data source type for direct query on files: " +
+            s"${u.tableIdentifier.database.get}")
+        }
         val plan = LogicalRelation(dataSource.resolveRelation())
         u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1a7f6eb..4fcd6bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         df)
     })
 
-    val e1 = intercept[AnalysisException] {
+    var e = intercept[AnalysisException] {
       sql("select * from in_valid_table")
     }
-    assert(e1.message.contains("Table or view not found"))
+    assert(e.message.contains("Table or view not found"))
 
-    val e2 = intercept[AnalysisException] {
+    e = intercept[AnalysisException] {
       sql("select * from no_db.no_table").show()
     }
-    assert(e2.message.contains("Table or view not found"))
+    assert(e.message.contains("Table or view not found"))
 
-    val e3 = intercept[AnalysisException] {
+    e = intercept[AnalysisException] {
       sql("select * from json.invalid_file")
     }
-    assert(e3.message.contains("Path does not exist"))
+    assert(e.message.contains("Path does not exist"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`")
+    }
+    assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from `com.databricks.spark.avro`.`file_path`")
+    }
+    assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " +
+      "Please use Spark package http://spark-packages.org/package/databricks/spark-avro"))
+
+    // data source type is case insensitive
+    e = intercept[AnalysisException] {
+      sql(s"select id from Avro.`file_path`")
+    }
+    assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
+      "http://spark-packages.org/package/databricks/spark-avro"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from avro.`file_path`")
+    }
+    assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
+      "http://spark-packages.org/package/databricks/spark-avro"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
+    }
+    assert(e.message.contains("Table or view not found: " +
+      "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from `Jdbc`.`file_path`")
+    }
+    assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc"))
+
+    e = intercept[AnalysisException] {
+      sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`")
+    }
+    assert(e.message.contains("Unsupported data source type for direct query on files: " +
+      "org.apache.spark.sql.execution.datasources.jdbc"))
   }
 
   test("SortMergeJoin returns wrong results when using UnsafeRows") {

http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
index f07c330..85ba33e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.sources
 
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{AnalysisException, SQLContext}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
@@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
   }
 
   test("should fail to load ORC without Hive Support") {
-    intercept[ClassNotFoundException] {
+    val e = intercept[AnalysisException] {
       spark.read.format("orc").load()
     }
+    assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
index 320aaea..5ea1f32 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.datasources.DataSource
 
 class ResolvedDataSourceSuite extends SparkFunSuite {
@@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
         classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
   }
 
+  test("csv") {
+    assert(
+      getProvidingClass("csv") ===
+        classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
+    assert(
+      getProvidingClass("com.databricks.spark.csv") ===
+        classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
+  }
+
   test("error message for unknown data sources") {
-    val error1 = intercept[ClassNotFoundException] {
+    val error1 = intercept[AnalysisException] {
       getProvidingClass("avro")
     }
     assert(error1.getMessage.contains("spark-packages"))
 
-    val error2 = intercept[ClassNotFoundException] {
+    val error2 = intercept[AnalysisException] {
       getProvidingClass("com.databricks.spark.avro")
     }
     assert(error2.getMessage.contains("spark-packages"))

http://git-wip-us.apache.org/repos/asf/spark/blob/9aff6f3b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b569145..24de223 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     }
   }
 
-  test("run sql directly on files") {
+  test("run sql directly on files - parquet") {
     val df = spark.range(100).toDF()
     withTempPath(f => {
       df.write.parquet(f.getCanonicalPath)
-      checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
+      // data source type is case insensitive
+      checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"),
         df)
       checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
         df)
@@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     })
   }
 
+  test("run sql directly on files - orc") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.orc(f.getCanonicalPath)
+      // data source type is case insensitive
+      checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"),
+        df)
+    })
+  }
+
+  test("run sql directly on files - csv") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.csv(f.getCanonicalPath)
+      // data source type is case insensitive
+      checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(
+        sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"),
+        df)
+    })
+  }
+
+  test("run sql directly on files - json") {
+    val df = spark.range(100).toDF()
+    withTempPath(f => {
+      df.write.json(f.getCanonicalPath)
+      // data source type is case insensitive
+      checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
+        df)
+      checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
+        df)
+    })
+  }
+
   test("SPARK-8976 Wrong Result for Rollup #1") {
     checkAnswer(sql(
       "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org