You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/04/23 22:20:02 UTC

spark git commit: [SPARK-14865][SQL] Better error handling for view creation.

Repository: spark
Updated Branches:
  refs/heads/master 890abd127 -> e3c1366bb


[SPARK-14865][SQL] Better error handling for view creation.

## What changes were proposed in this pull request?
This patch improves error handling in view creation. CreateViewCommand itself will analyze the view SQL query first, and if it cannot successfully analyze it, throw an AnalysisException.

In addition, I also added the following two conservative guards for easier identification of Spark bugs:

1. If there is a bug and the generated view SQL cannot be analyzed, throw an exception at runtime. Note that this is not an AnalysisException because it is not caused by the user and more likely indicate a bug in Spark.
2. SQLBuilder when it gets an unresolved plan, it will also show the plan in the error message.

I also took the chance to simplify the internal implementation of CreateViewCommand, and *removed* a fallback path that would've masked an exception from before.

## How was this patch tested?
1. Added a unit test for the user facing error handling.
2. Manually introduced some bugs in Spark to test the internal defensive error handling.
3. Also added a test case to test nested views (not super relevant).

Author: Reynold Xin <rx...@databricks.com>

Closes #12633 from rxin/SPARK-14865.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e3c1366b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e3c1366b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e3c1366b

Branch: refs/heads/master
Commit: e3c1366bbcf712f8d7a91640eb11e67a4419e4be
Parents: 890abd1
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat Apr 23 13:19:57 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat Apr 23 13:19:57 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SQLBuilder.scala  |   3 +-
 .../spark/sql/execution/command/views.scala     | 103 ++++++++++---------
 .../spark/sql/hive/execution/SQLViewSuite.scala |  65 +++++++-----
 3 files changed, 100 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e3c1366b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
index d65b3cb..7f26a7e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/SQLBuilder.scala
@@ -39,7 +39,8 @@ import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
  * supported by this builder (yet).
  */
 class SQLBuilder(logicalPlan: LogicalPlan) extends Logging {
-  require(logicalPlan.resolved, "SQLBuilder only supports resolved logical query plans")
+  require(logicalPlan.resolved,
+    "SQLBuilder only supports resolved logical query plans. Current plan:\n" + logicalPlan)
 
   def this(df: Dataset[_]) = this(df.queryExecution.analyzed)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e3c1366b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 082f944..7542f9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -63,9 +63,12 @@ case class CreateViewCommand(
   }
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
-    val analzyedPlan = sqlContext.executePlan(child).analyzed
+    // If the plan cannot be analyzed, throw an exception and don't proceed.
+    val qe = sqlContext.executePlan(child)
+    qe.assertAnalyzed()
+    val analyzedPlan = qe.analyzed
 
-    require(tableDesc.schema == Nil || tableDesc.schema.length == analzyedPlan.output.length)
+    require(tableDesc.schema == Nil || tableDesc.schema.length == analyzedPlan.output.length)
     val sessionState = sqlContext.sessionState
 
     if (sessionState.catalog.tableExists(tableIdentifier)) {
@@ -74,7 +77,7 @@ case class CreateViewCommand(
         // already exists.
       } else if (replace) {
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-        sessionState.catalog.alterTable(prepareTable(sqlContext, analzyedPlan))
+        sessionState.catalog.alterTable(prepareTable(sqlContext, analyzedPlan))
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
         // exists.
@@ -85,68 +88,74 @@ case class CreateViewCommand(
     } else {
       // Create the view if it doesn't exist.
       sessionState.catalog.createTable(
-        prepareTable(sqlContext, analzyedPlan), ignoreIfExists = false)
+        prepareTable(sqlContext, analyzedPlan), ignoreIfExists = false)
     }
 
     Seq.empty[Row]
   }
 
-  private def prepareTable(sqlContext: SQLContext, analzyedPlan: LogicalPlan): CatalogTable = {
-    val expandedText = if (sqlContext.conf.canonicalView) {
-      try rebuildViewQueryString(sqlContext, analzyedPlan) catch {
-        case NonFatal(e) => wrapViewTextWithSelect(analzyedPlan)
+  /**
+   * Returns a [[CatalogTable]] that can be used to save in the catalog. This comment canonicalize
+   * SQL based on the analyzed plan, and also creates the proper schema for the view.
+   */
+  private def prepareTable(sqlContext: SQLContext, analyzedPlan: LogicalPlan): CatalogTable = {
+    val viewSQL: String =
+      if (sqlContext.conf.canonicalView) {
+        val logicalPlan =
+          if (tableDesc.schema.isEmpty) {
+            analyzedPlan
+          } else {
+            val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
+              case (attr, col) => Alias(attr, col.name)()
+            }
+            sqlContext.executePlan(Project(projectList, analyzedPlan)).analyzed
+          }
+        new SQLBuilder(logicalPlan).toSQL
+      } else {
+        // When user specified column names for view, we should create a project to do the renaming.
+        // When no column name specified, we still need to create a project to declare the columns
+        // we need, to make us more robust to top level `*`s.
+        val viewOutput = {
+          val columnNames = analyzedPlan.output.map(f => quote(f.name))
+          if (tableDesc.schema.isEmpty) {
+            columnNames.mkString(", ")
+          } else {
+            columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
+              case (name, alias) => s"$name AS $alias"
+            }.mkString(", ")
+          }
+        }
+
+        val viewText = tableDesc.viewText.get
+        val viewName = quote(tableDesc.identifier.table)
+        s"SELECT $viewOutput FROM ($viewText) $viewName"
       }
-    } else {
-      wrapViewTextWithSelect(analzyedPlan)
+
+    // Validate the view SQL - make sure we can parse it and analyze it.
+    // If we cannot analyze the generated query, there is probably a bug in SQL generation.
+    try {
+      sqlContext.sql(viewSQL).queryExecution.assertAnalyzed()
+    } catch {
+      case NonFatal(e) =>
+        throw new RuntimeException(
+          "Failed to analyze the canonicalized SQL. It is possible there is a bug in Spark.", e)
     }
 
-    val viewSchema = {
+    val viewSchema: Seq[CatalogColumn] = {
       if (tableDesc.schema.isEmpty) {
-        analzyedPlan.output.map { a =>
+        analyzedPlan.output.map { a =>
           CatalogColumn(a.name, a.dataType.simpleString)
         }
       } else {
-        analzyedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
+        analyzedPlan.output.zip(tableDesc.schema).map { case (a, col) =>
           CatalogColumn(col.name, a.dataType.simpleString, nullable = true, col.comment)
         }
       }
     }
 
-    tableDesc.copy(schema = viewSchema, viewText = Some(expandedText))
-  }
-
-  private def wrapViewTextWithSelect(analzyedPlan: LogicalPlan): String = {
-    // When user specified column names for view, we should create a project to do the renaming.
-    // When no column name specified, we still need to create a project to declare the columns
-    // we need, to make us more robust to top level `*`s.
-    val viewOutput = {
-      val columnNames = analzyedPlan.output.map(f => quote(f.name))
-      if (tableDesc.schema.isEmpty) {
-        columnNames.mkString(", ")
-      } else {
-        columnNames.zip(tableDesc.schema.map(f => quote(f.name))).map {
-          case (name, alias) => s"$name AS $alias"
-        }.mkString(", ")
-      }
-    }
-
-    val viewText = tableDesc.viewText.get
-    val viewName = quote(tableDesc.identifier.table)
-    s"SELECT $viewOutput FROM ($viewText) $viewName"
-  }
-
-  private def rebuildViewQueryString(sqlContext: SQLContext, analzyedPlan: LogicalPlan): String = {
-    val logicalPlan = if (tableDesc.schema.isEmpty) {
-      analzyedPlan
-    } else {
-      val projectList = analzyedPlan.output.zip(tableDesc.schema).map {
-        case (attr, col) => Alias(attr, col.name)()
-      }
-      sqlContext.executePlan(Project(projectList, analzyedPlan)).analyzed
-    }
-    new SQLBuilder(logicalPlan).toSQL
+    tableDesc.copy(schema = viewSchema, viewText = Some(viewSQL))
   }
 
-  // escape backtick with double-backtick in column name and wrap it with backtick.
+  /** Escape backtick with double-backtick in column name and wrap it with backtick. */
   private def quote(name: String) = s"`${name.replaceAll("`", "``")}`"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e3c1366b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index cdd5cb3..0d88b3b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -28,26 +28,50 @@ import org.apache.spark.sql.test.SQLTestUtils
 class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
   import hiveContext.implicits._
 
+  override def beforeAll(): Unit = {
+    // Create a simple table with two columns: id and id1
+    sqlContext.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt")
+  }
+
+  override def afterAll(): Unit = {
+    sqlContext.sql(s"DROP TABLE IF EXISTS jt")
+  }
+
+  test("nested views") {
+    withView("jtv1", "jtv2") {
+      sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3").collect()
+      sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6").collect()
+      checkAnswer(sql("select count(*) FROM jtv2"), Row(2))
+    }
+  }
+
+  test("error handling: fail if the view sql itself is invalid") {
+    // A table that does not exist
+    intercept[AnalysisException] {
+      sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect()
+    }
+
+    // A column that does not exist
+    intercept[AnalysisException] {
+      sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect()
+    }
+  }
+
   test("correctly parse CREATE VIEW statement") {
     withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt") {
-        val df = (1 until 10).map(i => i -> i).toDF("i", "j")
-        df.write.format("json").saveAsTable("jt")
-        sql(
-          """CREATE VIEW IF NOT EXISTS
-            |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
-            |TBLPROPERTIES ('a' = 'b')
-            |AS SELECT * FROM jt""".stripMargin)
-        checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
-        sql("DROP VIEW testView")
-      }
+      sql(
+        """CREATE VIEW IF NOT EXISTS
+          |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla')
+          |TBLPROPERTIES ('a' = 'b')
+          |AS SELECT * FROM jt""".stripMargin)
+      checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i)))
+      sql("DROP VIEW testView")
     }
   }
 
   test("correctly handle CREATE VIEW IF NOT EXISTS") {
     withSQLConf(SQLConf.NATIVE_VIEW.key -> "true") {
-      withTable("jt", "jt2") {
-        sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+      withTable("jt2") {
         sql("CREATE VIEW testView AS SELECT id FROM jt")
 
         val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -66,8 +90,7 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     test(s"$prefix correctly handle CREATE OR REPLACE VIEW") {
       withSQLConf(
         SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt", "jt2") {
-          sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
+        withTable("jt2") {
           sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt")
           checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
 
@@ -90,9 +113,8 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     test(s"$prefix correctly handle ALTER VIEW") {
       withSQLConf(
         SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt", "jt2") {
+        withTable("jt2") {
           withView("testView") {
-            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
             sql("CREATE VIEW testView AS SELECT id FROM jt")
 
             val df = (1 until 10).map(i => i -> i).toDF("i", "j")
@@ -109,12 +131,9 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
       // json table is not hive-compatible, make sure the new flag fix it.
       withSQLConf(
         SQLConf.NATIVE_VIEW.key -> "true", SQLConf.CANONICAL_NATIVE_VIEW.key -> enabled.toString) {
-        withTable("jt") {
-          withView("testView") {
-            sqlContext.range(1, 10).write.format("json").saveAsTable("jt")
-            sql("CREATE VIEW testView AS SELECT id FROM jt")
-            checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
-          }
+        withView("testView") {
+          sql("CREATE VIEW testView AS SELECT id FROM jt")
+          checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i)))
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org