You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org> on 2023/04/13 16:05:24 UTC

[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40732: [SPARK-43085][SQL] Support column DEFAULT assignment for multi-part table names

ryan-johnson-databricks commented on code in PR #40732:
URL: https://github.com/apache/spark/pull/40732#discussion_r1165745301


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -550,45 +551,24 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    * Returns the schema for the target table of a DML command, looking into the catalog if needed.
    */
   private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType] = {
-    // First find the source relation. Note that we use 'collectFirst' to descend past any
-    // SubqueryAlias nodes that may be present.
-    val source: Option[LogicalPlan] = table.collectFirst {
-      case r: NamedRelation if !r.skipSchemaResolution =>
-        // Here we only resolve the default columns in the tables that require schema resolution
-        // during write operations.
-        r
-      case r: UnresolvedCatalogRelation => r
-    }
-    // Check if the target table is already resolved. If so, return the computed schema.
-    source.foreach { r =>
-      if (r.schema.fields.nonEmpty) {
-        return Some(r.schema)
-      }
-    }
-    // Lookup the relation from the catalog by name. This either succeeds or returns some "not
-    // found" error. In the latter cases, return out of this rule without changing anything and let
-    // the analyzer return a proper error message elsewhere.
-    val tableName: TableIdentifier = source match {
-      case Some(r: UnresolvedRelation) => TableIdentifier(r.name)
-      case Some(r: UnresolvedCatalogRelation) => r.tableMeta.identifier
-      case _ => return None
-    }
-    // First try to get the table metadata directly. If that fails, check for views below.
-    if (catalog.tableExists(tableName)) {
-      return Some(catalog.getTableMetadata(tableName).schema)
-    }
-    val lookup: LogicalPlan = try {
-      catalog.lookupRelation(tableName)
-    } catch {
-      case _: AnalysisException => return None
-    }
-    lookup match {
-      case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
-        Some(r.tableMeta.schema)
-      case SubqueryAlias(_, r: View) if r.isTempView =>
-        Some(r.desc.schema)
-      case _ => None
+    table.foreach {
+      case r: UnresolvedCatalogRelation =>
+        return Some(r.tableMeta.schema)
+      case d: DataSourceV2Relation =>
+        return Some(CatalogV2Util.v2ColumnsToStructType(d.table.columns()))
+      case r: UnresolvedRelation if !r.skipSchemaResolution =>
+        val resolved = resolveRelations(r)
+        resolved.collectFirst {
+          case u: UnresolvedCatalogRelation =>

Review Comment:
   We already check for `UnresolvedCatalogRelation` above... what happens if `resolveRelations` returns one of the other types of interest, like `DataSourceV2Relation`?
   
   Also, shouldn't we somehow install the result of the `resolveRelations` call, so that we don't repeat the work later?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -550,45 +551,24 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    * Returns the schema for the target table of a DML command, looking into the catalog if needed.
    */
   private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType] = {
-    // First find the source relation. Note that we use 'collectFirst' to descend past any
-    // SubqueryAlias nodes that may be present.
-    val source: Option[LogicalPlan] = table.collectFirst {
-      case r: NamedRelation if !r.skipSchemaResolution =>
-        // Here we only resolve the default columns in the tables that require schema resolution
-        // during write operations.
-        r
-      case r: UnresolvedCatalogRelation => r
-    }
-    // Check if the target table is already resolved. If so, return the computed schema.
-    source.foreach { r =>
-      if (r.schema.fields.nonEmpty) {
-        return Some(r.schema)
-      }
-    }
-    // Lookup the relation from the catalog by name. This either succeeds or returns some "not
-    // found" error. In the latter cases, return out of this rule without changing anything and let
-    // the analyzer return a proper error message elsewhere.
-    val tableName: TableIdentifier = source match {
-      case Some(r: UnresolvedRelation) => TableIdentifier(r.name)
-      case Some(r: UnresolvedCatalogRelation) => r.tableMeta.identifier
-      case _ => return None
-    }
-    // First try to get the table metadata directly. If that fails, check for views below.
-    if (catalog.tableExists(tableName)) {
-      return Some(catalog.getTableMetadata(tableName).schema)
-    }
-    val lookup: LogicalPlan = try {
-      catalog.lookupRelation(tableName)
-    } catch {
-      case _: AnalysisException => return None
-    }
-    lookup match {
-      case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
-        Some(r.tableMeta.schema)
-      case SubqueryAlias(_, r: View) if r.isTempView =>
-        Some(r.desc.schema)
-      case _ => None
+    table.foreach {
+      case r: UnresolvedCatalogRelation =>
+        return Some(r.tableMeta.schema)
+      case d: DataSourceV2Relation =>
+        return Some(CatalogV2Util.v2ColumnsToStructType(d.table.columns()))
+      case r: UnresolvedRelation if !r.skipSchemaResolution =>
+        val resolved = resolveRelations(r)
+        resolved.collectFirst {
+          case u: UnresolvedCatalogRelation =>
+            return Some(u.tableMeta.schema)
+        }
+        resolved.collectFirst {
+          case v: View =>
+            return Some(v.schema)

Review Comment:
   Intentional to only look for `View` after failing to find any `UnresolvedCatalogRelation`?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -550,45 +551,24 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    * Returns the schema for the target table of a DML command, looking into the catalog if needed.
    */
   private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType] = {
-    // First find the source relation. Note that we use 'collectFirst' to descend past any
-    // SubqueryAlias nodes that may be present.
-    val source: Option[LogicalPlan] = table.collectFirst {
-      case r: NamedRelation if !r.skipSchemaResolution =>
-        // Here we only resolve the default columns in the tables that require schema resolution
-        // during write operations.
-        r
-      case r: UnresolvedCatalogRelation => r
-    }
-    // Check if the target table is already resolved. If so, return the computed schema.
-    source.foreach { r =>
-      if (r.schema.fields.nonEmpty) {
-        return Some(r.schema)
-      }
-    }
-    // Lookup the relation from the catalog by name. This either succeeds or returns some "not
-    // found" error. In the latter cases, return out of this rule without changing anything and let
-    // the analyzer return a proper error message elsewhere.
-    val tableName: TableIdentifier = source match {
-      case Some(r: UnresolvedRelation) => TableIdentifier(r.name)
-      case Some(r: UnresolvedCatalogRelation) => r.tableMeta.identifier
-      case _ => return None
-    }
-    // First try to get the table metadata directly. If that fails, check for views below.
-    if (catalog.tableExists(tableName)) {
-      return Some(catalog.getTableMetadata(tableName).schema)
-    }
-    val lookup: LogicalPlan = try {
-      catalog.lookupRelation(tableName)
-    } catch {
-      case _: AnalysisException => return None
-    }
-    lookup match {
-      case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
-        Some(r.tableMeta.schema)
-      case SubqueryAlias(_, r: View) if r.isTempView =>
-        Some(r.desc.schema)
-      case _ => None
+    table.foreach {
+      case r: UnresolvedCatalogRelation =>
+        return Some(r.tableMeta.schema)
+      case d: DataSourceV2Relation =>
+        return Some(CatalogV2Util.v2ColumnsToStructType(d.table.columns()))
+      case r: UnresolvedRelation if !r.skipSchemaResolution =>
+        val resolved = resolveRelations(r)
+        resolved.collectFirst {
+          case u: UnresolvedCatalogRelation =>

Review Comment:
   Also... in Analyzer.scala I see that `ResolveRelations` runs _before_ `ResolveDefaultColumns` -- so why would the plan here ever contain an `UnresolvedRelation`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org