You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/14 14:23:57 UTC

[spark] branch branch-3.2 updated: [SPARK-36905] Fix reading hive views without explicit column names

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 596b60e  [SPARK-36905] Fix reading hive views without explicit column names
596b60e is described below

commit 596b60e3e6b5d833b1bbac2f1d51b3c8af25a185
Author: Linhong Liu <li...@databricks.com>
AuthorDate: Thu Oct 14 22:22:28 2021 +0800

    [SPARK-36905] Fix reading hive views without explicit column names
    
    ### What changes were proposed in this pull request?
    When a hive view is created without explicit column names, spark couldn't read it correctly. For example:
    ```
    -- use hive to create the view
    CREATE VIEW test_view AS SELECT 1 FROM t
    -- use spark to read the view
    SELECT * FROM test_view
    ```
    We will get an exception about: `cannot resolve '_c0' given input columns: [1]`
    The problematic plan is:
    ```
    'Project [upcast('_c0, IntegerType) AS _c0#3]
     +- Project [1 AS 1#4]
        +- SubqueryAlias spark_catalog.default.some_table
           +- Relation default.some_table[id#1L] orc
    ```
    
    This PR handles the views created by Hive separately to fix this issue.
    
    ### Why are the changes needed?
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    No, this is a regression.
    
    ### How was this patch tested?
    newly added UT
    
    Closes #34254 from linhongliu-db/SPARK-36905.
    
    Lead-authored-by: Linhong Liu <li...@databricks.com>
    Co-authored-by: Linhong Liu <67...@users.noreply.github.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 2cc3aead5b17b2ebfb7f88f60291d412643240c7)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/catalog/SessionCatalog.scala      | 85 +++++++++++++---------
 .../spark/sql/hive/execution/SQLQuerySuite.scala   | 10 +++
 2 files changed, 61 insertions(+), 34 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4860f46..8bba6bd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -862,6 +862,13 @@ class SessionCatalog(
     }
   }
 
+  private def isHiveCreatedView(metadata: CatalogTable): Boolean = {
+    // For views created by hive without explicit column names, there will be auto-generated
+    // column names like "_c0", "_c1", "_c2"...
+    metadata.viewQueryColumnNames.isEmpty &&
+      metadata.schema.fieldNames.exists(_.matches("_c[0-9]+"))
+  }
+
   private def fromCatalogTable(metadata: CatalogTable, isTempView: Boolean): View = {
     val viewText = metadata.viewText.getOrElse {
       throw new IllegalStateException("Invalid view without text.")
@@ -870,42 +877,52 @@ class SessionCatalog(
     val parsedPlan = SQLConf.withExistingConf(View.effectiveSQLConf(viewConfigs, isTempView)) {
       parser.parsePlan(viewText)
     }
-    val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
-      // For view created before Spark 2.2.0, the view text is already fully qualified, the plan
-      // output is the same with the view output.
-      metadata.schema.fieldNames.toSeq
-    } else {
-      assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
-      metadata.viewQueryColumnNames
-    }
+    val projectList = if (!isHiveCreatedView(metadata)) {
+      val viewColumnNames = if (metadata.viewQueryColumnNames.isEmpty) {
+        // For view created before Spark 2.2.0, the view text is already fully qualified, the plan
+        // output is the same with the view output.
+        metadata.schema.fieldNames.toSeq
+      } else {
+        assert(metadata.viewQueryColumnNames.length == metadata.schema.length)
+        metadata.viewQueryColumnNames
+      }
 
-    // For view queries like `SELECT * FROM t`, the schema of the referenced table/view may
-    // change after the view has been created. We need to add an extra SELECT to pick the columns
-    // according to the recorded column names (to get the correct view column ordering and omit
-    // the extra columns that we don't require), with UpCast (to make sure the type change is
-    // safe) and Alias (to respect user-specified view column names) according to the view schema
-    // in the catalog.
-    // Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS
-    // SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same
-    // number of duplications, and pick the corresponding attribute by ordinal.
-    val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
-    val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) {
-      identity
+      // For view queries like `SELECT * FROM t`, the schema of the referenced table/view may
+      // change after the view has been created. We need to add an extra SELECT to pick the columns
+      // according to the recorded column names (to get the correct view column ordering and omit
+      // the extra columns that we don't require), with UpCast (to make sure the type change is
+      // safe) and Alias (to respect user-specified view column names) according to the view schema
+      // in the catalog.
+      // Note that, the column names may have duplication, e.g. `CREATE VIEW v(x, y) AS
+      // SELECT 1 col, 2 col`. We need to make sure that the matching attributes have the same
+      // number of duplications, and pick the corresponding attribute by ordinal.
+      val viewConf = View.effectiveSQLConf(metadata.viewSQLConfigs, isTempView)
+      val normalizeColName: String => String = if (viewConf.caseSensitiveAnalysis) {
+        identity
+      } else {
+        _.toLowerCase(Locale.ROOT)
+      }
+      val nameToCounts = viewColumnNames.groupBy(normalizeColName).mapValues(_.length)
+      val nameToCurrentOrdinal = scala.collection.mutable.HashMap.empty[String, Int]
+      val viewDDL = buildViewDDL(metadata, isTempView)
+
+      viewColumnNames.zip(metadata.schema).map { case (name, field) =>
+        val normalizedName = normalizeColName(name)
+        val count = nameToCounts(normalizedName)
+        val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
+        nameToCurrentOrdinal(normalizedName) = ordinal + 1
+        val col = GetViewColumnByNameAndOrdinal(
+          metadata.identifier.toString, name, ordinal, count, viewDDL)
+        Alias(UpCast(col, field.dataType), field.name)(explicitMetadata = Some(field.metadata))
+      }
     } else {
-      _.toLowerCase(Locale.ROOT)
-    }
-    val nameToCounts = viewColumnNames.groupBy(normalizeColName).mapValues(_.length)
-    val nameToCurrentOrdinal = scala.collection.mutable.HashMap.empty[String, Int]
-    val viewDDL = buildViewDDL(metadata, isTempView)
-
-    val projectList = viewColumnNames.zip(metadata.schema).map { case (name, field) =>
-      val normalizedName = normalizeColName(name)
-      val count = nameToCounts(normalizedName)
-      val ordinal = nameToCurrentOrdinal.getOrElse(normalizedName, 0)
-      nameToCurrentOrdinal(normalizedName) = ordinal + 1
-      val col = GetViewColumnByNameAndOrdinal(
-        metadata.identifier.toString, name, ordinal, count, viewDDL)
-      Alias(UpCast(col, field.dataType), field.name)(explicitMetadata = Some(field.metadata))
+      // For view created by hive, the parsed view plan may have different output columns with
+      // the schema stored in metadata. For example: `CREATE VIEW v AS SELECT 1 FROM t`
+      // the schema in metadata will be `_c0` while the parsed view plan has column named `1`
+      metadata.schema.zipWithIndex.map { case (field, index) =>
+        val col = GetColumnByOrdinal(index, field.dataType)
+        Alias(UpCast(col, field.dataType), field.name)(explicitMetadata = Some(field.metadata))
+      }
     }
     View(desc = metadata, isTempView = isTempView, child = Project(projectList, parsedPlan))
   }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index cc92a28..5db7050 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -2632,6 +2632,16 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi
       }
     }
   }
+
+  test("SPARK-36905: read hive views without without explicit column names") {
+    withTable("t1") {
+      withView("test_view") {
+        hiveClient.runSqlHive("create table t1 stored as avro as select 2 as id")
+        hiveClient.runSqlHive("create view test_view as select 1, id + 1 from t1")
+        checkAnswer(sql("select * from test_view"), Seq(Row(1, 3)))
+      }
+    }
+  }
 }
 
 @SlowHiveTest

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org