You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/03/15 20:05:11 UTC

[GitHub] [spark] imback82 commented on a change in pull request #31811: [SPARK-34719][SQL][3.0/3.1] Correctly resolve the view query with duplicated column names

imback82 commented on a change in pull request #31811:
URL: https://github.com/apache/spark/pull/31811#discussion_r594646085



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
##########
@@ -57,15 +60,43 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport {
     // The child has the different output attributes with the View operator. Adds a Project over
     // the child of the view.
     case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
+      // Use the stored view query output column names to find the matching attributes. The column
+      // names may have duplication, e.g. `CREATE VIEW v AS SELECT 1 col, 2 col`. We need to make
+      // sure the that matching attributes have the same number of duplications, and pick the
+      // corresponding attribute by ordinal.
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
-        // Find the attribute that has the expected attribute name from an attribute list, the names
-        // are compared using conf.resolver.
-        // `CheckAnalysis` already guarantees the expected attribute can be found for sure.
-        desc.viewQueryColumnNames.map { colName =>
-          child.output.find(attr => resolver(attr.name, colName)).get
+        val normalizeColName: String => String = if (conf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+        val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, Seq[Attribute]]
+
+        val outputAttrs = queryColumnNames.map { colName =>
+          val normalized = normalizeColName(colName)
+          val count = nameToCounts.getOrElse(normalized, 0)
+          val matchedCols = nameToMatchedCols.getOrElseUpdate(
+            normalized, child.output.filter(attr => resolver(attr.name, colName)))
+          if (matchedCols.length - 1 < count) {
+            throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be resolved.")

Review comment:
       nit: Should we be more specific for the "schema change" (e.g., one less duplicated column)?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
##########
@@ -57,15 +60,43 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport {
     // The child has the different output attributes with the View operator. Adds a Project over
     // the child of the view.
     case v @ View(desc, _, output, child) if child.resolved && !v.sameOutput(child) =>
+      // Use the stored view query output column names to find the matching attributes. The column
+      // names may have duplication, e.g. `CREATE VIEW v AS SELECT 1 col, 2 col`. We need to make
+      // sure the that matching attributes have the same number of duplications, and pick the
+      // corresponding attribute by ordinal.
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
-        // Find the attribute that has the expected attribute name from an attribute list, the names
-        // are compared using conf.resolver.
-        // `CheckAnalysis` already guarantees the expected attribute can be found for sure.
-        desc.viewQueryColumnNames.map { colName =>
-          child.output.find(attr => resolver(attr.name, colName)).get
+        val normalizeColName: String => String = if (conf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+        val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, Seq[Attribute]]
+
+        val outputAttrs = queryColumnNames.map { colName =>
+          val normalized = normalizeColName(colName)
+          val count = nameToCounts.getOrElse(normalized, 0)
+          val matchedCols = nameToMatchedCols.getOrElseUpdate(
+            normalized, child.output.filter(attr => resolver(attr.name, colName)))
+          if (matchedCols.length - 1 < count) {
+            throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be resolved.")
+          }
+          val col = matchedCols(count)
+          nameToCounts(normalized) = count + 1
+          col

Review comment:
       nit: I think you can simplify it as:
   ```scala
   nameToCounts(normalized) = count + 1
   matchedCols(count)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org