You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/14 07:02:13 UTC

[spark] branch master updated: [SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 816f6dd  [SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN
816f6dd is described below

commit 816f6dd13eb35908bab8f1524c7629a5c6d585c6
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Wed Apr 14 07:01:40 2021 +0000

    [SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN
    
    ### What changes were proposed in this pull request?
    
    Adds the duplicated common columns as hidden columns to the Projection used to rewrite NATURAL/USING JOINs.
    
    ### Why are the changes needed?
    
    Allows users to resolve either side of the NATURAL/USING JOIN's common keys.
    Previously, the user could only resolve the following columns:
    
    | Join type | Left key columns | Right key columns |
    | --- | --- | --- |
    | Inner | Yes | No |
    | Left | Yes | No |
    | Right | No | Yes |
    | Outer | No | No |
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The user can now symmetrically resolve the common columns from a NATURAL/USING JOIN.
    
    ### How was this patch tested?
    
    SQL-side tests. The behavior matches PostgreSQL and MySQL.
    
    Closes #31666 from karenfeng/spark-34527.
    
    Authored-by: Karen Feng <ka...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 100 +++---
 .../spark/sql/catalyst/analysis/unresolved.scala   |  15 +-
 .../catalyst/plans/logical/AnalysisHelper.scala    |   1 +
 .../plans/logical/basicLogicalOperators.scala      |  13 +-
 .../apache/spark/sql/catalyst/util/package.scala   |  26 +-
 .../datasources/v2/DataSourceV2Implicits.scala     |  10 +-
 .../resources/sql-tests/inputs/natural-join.sql    |  53 ++++
 .../test/resources/sql-tests/inputs/using-join.sql |  70 +++++
 .../sql-tests/results/natural-join.sql.out         | 249 ++++++++++++++-
 .../resources/sql-tests/results/using-join.sql.out | 338 +++++++++++++++++++++
 .../org/apache/spark/sql/DataFrameJoinSuite.scala  |  22 ++
 11 files changed, 843 insertions(+), 54 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d32ec06..7a11396 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -917,41 +917,30 @@ class Analyzer(override val catalogManager: CatalogManager)
    * Adds metadata columns to output for child relations when nodes are missing resolved attributes.
    *
    * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
-   * but the relation's output does not include the metadata columns until the relation is replaced
-   * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
-   * relation's output, the analyzer will detect that nothing produces the columns.
+   * but the relation's output does not include the metadata columns until the relation is replaced.
+   * Unless this rule adds metadata to the relation's output, the analyzer will detect that nothing
+   * produces the columns.
    *
    * This rule only adds metadata columns when a node is resolved but is missing input from its
    * children. This ensures that metadata columns are not added to the plan unless they are used. By
    * checking only resolved nodes, this ensures that * expansion is already done so that metadata
-   * columns are not accidentally selected by *.
+   * columns are not accidentally selected by *. This rule resolves operators downwards to avoid
+   * projecting away metadata columns prematurely.
    */
   object AddMetadataColumns extends Rule[LogicalPlan] {
-    import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
 
-    private def hasMetadataCol(plan: LogicalPlan): Boolean = {
-      plan.expressions.exists(_.find {
-        case a: Attribute => a.isMetadataCol
-        case _ => false
-      }.isDefined)
-    }
-
-    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
-      case r: DataSourceV2Relation => r.withMetadataColumns()
-      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
-    }
+    import org.apache.spark.sql.catalyst.util._
 
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+      // Add metadata output to all node types
       case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
         val inputAttrs = AttributeSet(node.children.flatMap(_.output))
-        val metaCols = node.expressions.flatMap(_.collect {
-          case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a
-        })
+        val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
         if (metaCols.isEmpty) {
           node
         } else {
           val newNode = addMetadataCol(node)
-          // We should not change the output schema of the plan. We should project away the extr
+          // We should not change the output schema of the plan. We should project away the extra
           // metadata columns if necessary.
           if (newNode.sameOutput(node)) {
             newNode
@@ -960,6 +949,38 @@ class Analyzer(override val catalogManager: CatalogManager)
           }
         }
     }
+
+    private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
+      plan.expressions.flatMap(_.collect {
+        case a: Attribute if a.isMetadataCol => a
+        case a: Attribute
+          if plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId)) =>
+          plan.children.collectFirst {
+            case c if c.metadataOutput.exists(_.exprId == a.exprId) =>
+              c.metadataOutput.find(_.exprId == a.exprId).get
+          }.get
+      })
+    }
+
+    private def hasMetadataCol(plan: LogicalPlan): Boolean = {
+      plan.expressions.exists(_.find {
+        case a: Attribute =>
+          // If an attribute is resolved before being labeled as metadata
+          // (i.e. from the originating Dataset), we check with expression ID
+          a.isMetadataCol ||
+            plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))
+        case _ => false
+      }.isDefined)
+    }
+
+    private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
+      case r: DataSourceV2Relation => r.withMetadataColumns()
+      case p: Project =>
+        p.copy(
+          projectList = p.metadataOutput ++ p.projectList,
+          child = addMetadataCol(p.child))
+      case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+    }
   }
 
   /**
@@ -1898,10 +1919,10 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
 
     /**
-     * This method tries to resolve expressions and find missing attributes recursively. Specially,
-     * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved
-     * attributes which are missed from child output. This method tries to find the missing
-     * attributes out and add into the projection.
+     * This method tries to resolve expressions and find missing attributes recursively.
+     * Specifically, when the expressions used in `Sort` or `Filter` contain unresolved attributes
+     * or resolved attributes which are missing from child output. This method tries to find the
+     * missing attributes and add them into the projection.
      */
     private def resolveExprsAndAddMissingAttrs(
         exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
@@ -3150,7 +3171,9 @@ class Analyzer(override val catalogManager: CatalogManager)
       joinType: JoinType,
       joinNames: Seq[String],
       condition: Option[Expression],
-      hint: JoinHint) = {
+      hint: JoinHint): LogicalPlan = {
+    import org.apache.spark.sql.catalyst.util._
+
     val leftKeys = joinNames.map { keyName =>
       left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
         throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
@@ -3170,26 +3193,33 @@ class Analyzer(override val catalogManager: CatalogManager)
     val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))
 
     // the output list looks like: join keys, columns from left, columns from right
-    val projectList = joinType match {
+    val (projectList, hiddenList) = joinType match {
       case LeftOuter =>
-        leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
+        (leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)), rightKeys)
       case LeftExistence(_) =>
-        leftKeys ++ lUniqueOutput
+        (leftKeys ++ lUniqueOutput, Seq.empty)
       case RightOuter =>
-        rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
+        (rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys)
       case FullOuter =>
         // in full outer join, joinCols should be non-null if there is.
         val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
-        joinedCols ++
+        (joinedCols ++
           lUniqueOutput.map(_.withNullability(true)) ++
-          rUniqueOutput.map(_.withNullability(true))
+          rUniqueOutput.map(_.withNullability(true)),
+          leftKeys ++ rightKeys)
       case _ : InnerLike =>
-        leftKeys ++ lUniqueOutput ++ rUniqueOutput
+        (leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
       case _ =>
         sys.error("Unsupported natural join type " + joinType)
     }
-    // use Project to trim unnecessary fields
-    Project(projectList, Join(left, right, joinType, newCondition, hint))
+    // use Project to hide duplicated common keys
+    // propagate hidden columns from nested USING/NATURAL JOINs
+    val project = Project(projectList, Join(left, right, joinType, newCondition, hint))
+    project.setTagValue(
+      Project.hiddenOutputTag,
+      hiddenList.map(_.markAsSupportsQualifiedStar()) ++
+        project.child.metadataOutput.filter(_.supportsQualifiedStar))
+    project
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 3b2f4ca..5001e2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
-import org.apache.spark.sql.catalyst.util.quoteIfNeeded
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.types.{DataType, Metadata, StructType}
@@ -340,11 +340,11 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
    * Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
    *
    * For example, the following should all return true:
-   *   - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and
+   *   - `SELECT ns1.ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns1", "ns2", "t") and
    *     qualifier is Seq("ns1", "ns2", "t").
-   *   - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
+   *   - `SELECT ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns2", "t") and
    *     qualifier is Seq("ns1", "ns2", "t").
-   *   - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
+   *   - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
    *     qualifier is Seq("ns1", "ns2", "t").
    */
   private def matchedQualifier(
@@ -366,10 +366,13 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
   override def expand(
       input: LogicalPlan,
       resolver: Resolver): Seq[NamedExpression] = {
-    // If there is no table specified, use all input attributes.
+    // If there is no table specified, use all non-hidden input attributes.
     if (target.isEmpty) return input.output
 
-    val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver))
+    // If there is a table specified, use hidden input attributes as well
+    val hiddenOutput = input.metadataOutput.filter(_.supportsQualifiedStar)
+    val expandedAttributes = (hiddenOutput ++ input.output).filter(
+      matchedQualifier(_, target.get, resolver))
 
     if (expandedAttributes.nonEmpty) return expandedAttributes
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 5a888de..9e9bc69 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -145,6 +145,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
           self.markRuleAsIneffective(ruleId)
           self
         } else {
+          afterRule.copyTagsFrom(self)
           afterRule
         }
       }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6ebb1be..b31e930 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -25,17 +25,18 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.catalyst.trees.TreePattern.{
   INNER_LIKE_JOIN, JOIN, LEFT_SEMI_OR_ANTI_JOIN, NATURAL_LIKE_JOIN, OUTER_JOIN, TreePattern
 }
-import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.random.RandomSampler
 
 /**
- * When planning take() or collect() operations, this special node that is inserted at the top of
+ * When planning take() or collect() operations, this special node is inserted at the top of
  * the logical plan before invoking the query planner.
  *
  * Rules can pattern-match on this node in order to apply transformations that only take effect
@@ -69,7 +70,6 @@ object Subquery {
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
     extends OrderPreservingUnaryNode {
   override def output: Seq[Attribute] = projectList.map(_.toAttribute)
-  override def metadataOutput: Seq[Attribute] = Nil
   override def maxRows: Option[Long] = child.maxRows
 
   override lazy val resolved: Boolean = {
@@ -86,10 +86,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
   override lazy val validConstraints: ExpressionSet =
     getAllValidConstraints(projectList)
 
+  override def metadataOutput: Seq[Attribute] =
+    getTagValue(Project.hiddenOutputTag).getOrElse(Nil)
+
   override protected def withNewChildInternal(newChild: LogicalPlan): Project =
     copy(child = newChild)
 }
 
+object Project {
+  val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]("hidden_output")
+}
+
 /**
  * Applies a [[Generator]] to a stream of input rows, combining the
  * output of each into a new stream of rows.  This operation is similar to a `flatMap` in functional
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 4f8fcd9..33fe48d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{NumericType, StringType}
+import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 
@@ -201,4 +201,28 @@ package object util extends Logging {
   def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
     truncatedString(seq, "", sep, "", maxFields)
   }
+
+  val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+  implicit class MetadataColumnHelper(attr: Attribute) {
+    /**
+     * If set, this metadata column is a candidate during qualified star expansions.
+     */
+    val SUPPORTS_QUALIFIED_STAR = "__supports_qualified_star"
+
+    def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
+      attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
+
+    def supportsQualifiedStar: Boolean = attr.isMetadataCol &&
+      attr.metadata.contains(SUPPORTS_QUALIFIED_STAR) &&
+      attr.metadata.getBoolean(SUPPORTS_QUALIFIED_STAR)
+
+    def markAsSupportsQualifiedStar(): Attribute = attr.withMetadata(
+      new MetadataBuilder()
+        .withMetadata(attr.metadata)
+        .putBoolean(METADATA_COL_ATTR_KEY, true)
+        .putBoolean(SUPPORTS_QUALIFIED_STAR, true)
+        .build()
+    )
+  }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index c0d24e2..efd3ffe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 object DataSourceV2Implicits {
-  private val METADATA_COL_ATTR_KEY = "__metadata_col"
-
   implicit class TableHelper(table: Table) {
     def asReadable: SupportsRead = {
       table match {
@@ -101,11 +100,6 @@ object DataSourceV2Implicits {
     def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
   }
 
-  implicit class MetadataColumnHelper(attr: Attribute) {
-    def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
-      attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
-  }
-
   implicit class OptionsHelper(options: Map[String, String]) {
     def asOptions: CaseInsensitiveStringMap = {
       new CaseInsensitiveStringMap(options.asJava)
diff --git a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
index 71a5015..060f15e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
@@ -10,6 +10,19 @@ create temporary view nt2 as select * from values
   ("one", 5)
   as nt2(k, v2);
 
+create temporary view nt3 as select * from values
+  ("one", 4),
+  ("two", 5),
+  ("one", 6)
+  as nt3(k, v3);
+
+create temporary view nt4 as select * from values
+  ("one", 7),
+  ("two", 8),
+  ("one", 9)
+  as nt4(k, v4);
+
+SELECT * FROM nt1 natural join nt2;
 
 SELECT * FROM nt1 natural join nt2 where k = "one";
 
@@ -18,3 +31,43 @@ SELECT * FROM nt1 natural left join nt2 order by v1, v2;
 SELECT * FROM nt1 natural right join nt2 order by v1, v2;
 
 SELECT count(*) FROM nt1 natural full outer join nt2;
+
+SELECT k FROM nt1 natural join nt2;
+
+SELECT k FROM nt1 natural join nt2 where k = "one";
+
+SELECT nt1.* FROM nt1 natural join nt2;
+
+SELECT nt2.* FROM nt1 natural join nt2;
+
+SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq;
+
+SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq;
+
+SELECT nt1.*, nt2.* FROM nt1 natural join nt2;
+
+SELECT *, nt2.k FROM nt1 natural join nt2;
+
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2;
+
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one";
+
+SELECT * FROM (SELECT * FROM nt1 natural join nt2);
+
+SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2);
+
+SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2);
+
+SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2);
+
+SELECT * FROM nt1 natural join nt2 natural join nt3;
+
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3;
+
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k;
+
+SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k;
+
+SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k;
+
+SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/using-join.sql b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
new file mode 100644
index 0000000..336d19f
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
@@ -0,0 +1,70 @@
+create temporary view nt1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3)
+  as nt1(k, v1);
+
+create temporary view nt2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5),
+  ("four", 4)
+  as nt2(k, v2);
+
+SELECT * FROM nt1 left outer join nt2 using (k);
+
+SELECT k FROM nt1 left outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 left outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 left outer join nt2 using (k);
+
+SELECT * FROM nt1 left semi join nt2 using (k);
+
+SELECT k FROM nt1 left semi join nt2 using (k);
+
+SELECT nt1.* FROM nt1 left semi join nt2 using (k);
+
+SELECT nt1.k FROM nt1 left semi join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 left semi join nt2 using (k);
+
+SELECT * FROM nt1 right outer join nt2 using (k);
+
+SELECT k FROM nt1 right outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 right outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 right outer join nt2 using (k);
+
+SELECT * FROM nt1 full outer join nt2 using (k);
+
+SELECT k FROM nt1 full outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 full outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 full outer join nt2 using (k);
+
+SELECT * FROM nt1 full outer join nt2 using (k);
+
+SELECT k FROM nt1 inner join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 inner join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 inner join nt2 using (k);
diff --git a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
index 13f3197..794e472 100644
--- a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
@@ -1,5 +1,5 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 29
 
 
 -- !query
@@ -27,6 +27,40 @@ struct<>
 
 
 -- !query
+create temporary view nt3 as select * from values
+  ("one", 4),
+  ("two", 5),
+  ("one", 6)
+  as nt3(k, v3)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create temporary view nt4 as select * from values
+  ("one", 7),
+  ("two", 8),
+  ("one", 9)
+  as nt4(k, v4)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one	1	1
+one	1	5
+two	2	22
+
+
+-- !query
 SELECT * FROM nt1 natural join nt2 where k = "one"
 -- !query schema
 struct<k:string,v1:int,v2:int>
@@ -62,3 +96,216 @@ SELECT count(*) FROM nt1 natural full outer join nt2
 struct<count(1):bigint>
 -- !query output
 4
+
+
+-- !query
+SELECT k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT k FROM nt1 natural join nt2 where k = "one"
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+
+
+-- !query
+SELECT nt1.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one	1
+one	1
+two	2
+
+
+-- !query
+SELECT nt2.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v2:int>
+-- !query output
+one	1
+one	5
+two	22
+
+
+-- !query
+SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one	1	1
+one	1	5
+two	2	22
+
+
+-- !query
+SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query
+SELECT *, nt2.k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string>
+-- !query output
+one	1	1	one
+one	1	5	one
+two	2	22	two
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one"
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+
+
+-- !query
+SELECT * FROM (SELECT * FROM nt1 natural join nt2)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one	1	1
+one	1	5
+two	2	22
+
+
+-- !query
+SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query
+SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2)
+-- !query schema
+struct<v1:int,k:string>
+-- !query output
+1	one
+1	one
+2	two
+
+
+-- !query
+SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'nt2.k' given input columns: [__auto_generated_subquery_name.k, __auto_generated_subquery_name.v1, __auto_generated_subquery_name.v2]; line 1 pos 7
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 natural join nt3
+-- !query schema
+struct<k:string,v1:int,v2:int,v3:int>
+-- !query output
+one	1	1	4
+one	1	1	6
+one	1	5	4
+one	1	5	6
+two	2	22	5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int>
+-- !query output
+one	1	one	1	one	4
+one	1	one	1	one	6
+one	1	one	5	one	4
+one	1	one	5	one	6
+two	2	two	22	two	5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int>
+-- !query output
+one	1	one	1	one	4
+one	1	one	1	one	6
+one	1	one	5	one	4
+one	1	one	5	one	6
+two	2	two	22	two	5
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string,v3:int>
+-- !query output
+one	1	1	one	4
+one	1	1	one	6
+one	1	5	one	4
+one	1	5	one	6
+two	2	22	two	5
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string,v3:int>
+-- !query output
+one	1	1	one	4
+one	1	1	one	6
+one	1	5	one	4
+one	1	5	one	6
+two	2	22	two	5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int,k:string,v4:int>
+-- !query output
+one	1	one	1	one	4	one	7
+one	1	one	1	one	4	one	9
+one	1	one	1	one	6	one	7
+one	1	one	1	one	6	one	9
+one	1	one	5	one	4	one	7
+one	1	one	5	one	4	one	9
+one	1	one	5	one	6	one	7
+one	1	one	5	one	6	one	9
+two	2	two	22	two	5	two	8
diff --git a/sql/core/src/test/resources/sql-tests/results/using-join.sql.out b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out
new file mode 100644
index 0000000..1d2ae9d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out
@@ -0,0 +1,338 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 31
+
+
+-- !query
+create temporary view nt1 as select * from values
+  ("one", 1),
+  ("two", 2),
+  ("three", 3)
+  as nt1(k, v1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create temporary view nt2 as select * from values
+  ("one", 1),
+  ("two", 22),
+  ("one", 5),
+  ("four", 4)
+  as nt2(k, v2)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one	1	1
+one	1	5
+three	3	NULL
+two	2	22
+
+
+-- !query
+SELECT k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+three
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one	1	one	1
+one	1	one	5
+three	3	NULL	NULL
+two	2	two	22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+three	NULL
+two	two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+three	three
+two	two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+three	NULL
+two	two
+
+
+-- !query
+SELECT * FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one	1
+two	2
+
+
+-- !query
+SELECT k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+two
+
+
+-- !query
+SELECT nt1.* FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one	1
+two	2
+
+
+-- !query
+SELECT nt1.k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+two	two
+
+
+-- !query
+SELECT * FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four	NULL	4
+one	1	1
+one	1	5
+two	2	22
+
+
+-- !query
+SELECT k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+four
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+NULL	NULL	four	4
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+NULL	four
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four	NULL
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four	four
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT * FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four	NULL	4
+one	1	1
+one	1	5
+three	3	NULL
+two	2	22
+
+
+-- !query
+SELECT k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+four
+one
+one
+three
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+NULL	NULL	four	4
+one	1	one	1
+one	1	one	5
+three	3	NULL	NULL
+two	2	two	22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+NULL	four
+one	one
+one	one
+three	NULL
+two	two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four	NULL
+one	one
+one	one
+three	three
+two	two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four	four
+one	one
+one	one
+three	NULL
+two	two
+
+
+-- !query
+SELECT * FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four	NULL	4
+one	1	1
+one	1	5
+three	3	NULL
+two	2	22
+
+
+-- !query
+SELECT k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one	1	one	1
+one	1	one	5
+two	2	two	22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+two	two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one	one
+one	one
+two	two
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index c2555a1..a803fa8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -477,4 +477,26 @@ class DataFrameJoinSuite extends QueryTest
 
     checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2)))
   }
+
+  test("SPARK-34527: Resolve common columns from USING JOIN") {
+    val joinDf = testData2.as("testData2").join(
+      testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter")
+    val dfQuery = joinDf.select(
+      $"a", $"testData2.a", $"testData2.b", $"testData3.a", $"testData3.b")
+    val dfQuery2 = joinDf.select(
+      $"a", testData2.col("a"), testData2.col("b"), testData3.col("a"), testData3.col("b"))
+
+    Seq(dfQuery, dfQuery2).map { query =>
+      checkAnswer(query,
+        Seq(
+          Row(1, 1, 1, 1, null),
+          Row(1, 1, 2, 1, null),
+          Row(2, 2, 1, 2, 2),
+          Row(2, 2, 2, 2, 2),
+          Row(3, 3, 1, null, null),
+          Row(3, 3, 2, null, null)
+        )
+      )
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org