You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/04/14 07:02:13 UTC
[spark] branch master updated: [SPARK-34527][SQL] Resolve
duplicated common columns from USING/NATURAL JOIN
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 816f6dd [SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN
816f6dd is described below
commit 816f6dd13eb35908bab8f1524c7629a5c6d585c6
Author: Karen Feng <ka...@databricks.com>
AuthorDate: Wed Apr 14 07:01:40 2021 +0000
[SPARK-34527][SQL] Resolve duplicated common columns from USING/NATURAL JOIN
### What changes were proposed in this pull request?
Adds the duplicated common columns as hidden columns to the Projection used to rewrite NATURAL/USING JOINs.
### Why are the changes needed?
Allows users to resolve either side of the NATURAL/USING JOIN's common keys.
Previously, the user could only resolve the following columns:
| Join type | Left key columns | Right key columns |
| --- | --- | --- |
| Inner | Yes | No |
| Left | Yes | No |
| Right | No | Yes |
| Outer | No | No |
### Does this PR introduce _any_ user-facing change?
Yes. The user can now symmetrically resolve the common columns from a NATURAL/USING JOIN.
### How was this patch tested?
SQL-side tests. The behavior matches PostgreSQL and MySQL.
Closes #31666 from karenfeng/spark-34527.
Authored-by: Karen Feng <ka...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 100 +++---
.../spark/sql/catalyst/analysis/unresolved.scala | 15 +-
.../catalyst/plans/logical/AnalysisHelper.scala | 1 +
.../plans/logical/basicLogicalOperators.scala | 13 +-
.../apache/spark/sql/catalyst/util/package.scala | 26 +-
.../datasources/v2/DataSourceV2Implicits.scala | 10 +-
.../resources/sql-tests/inputs/natural-join.sql | 53 ++++
.../test/resources/sql-tests/inputs/using-join.sql | 70 +++++
.../sql-tests/results/natural-join.sql.out | 249 ++++++++++++++-
.../resources/sql-tests/results/using-join.sql.out | 338 +++++++++++++++++++++
.../org/apache/spark/sql/DataFrameJoinSuite.scala | 22 ++
11 files changed, 843 insertions(+), 54 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d32ec06..7a11396 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -917,41 +917,30 @@ class Analyzer(override val catalogManager: CatalogManager)
* Adds metadata columns to output for child relations when nodes are missing resolved attributes.
*
* References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
- * but the relation's output does not include the metadata columns until the relation is replaced
- * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
- * relation's output, the analyzer will detect that nothing produces the columns.
+ * but the relation's output does not include the metadata columns until the relation is replaced.
+ * Unless this rule adds metadata to the relation's output, the analyzer will detect that nothing
+ * produces the columns.
*
* This rule only adds metadata columns when a node is resolved but is missing input from its
* children. This ensures that metadata columns are not added to the plan unless they are used. By
* checking only resolved nodes, this ensures that * expansion is already done so that metadata
- * columns are not accidentally selected by *.
+ * columns are not accidentally selected by *. This rule resolves operators downwards to avoid
+ * projecting away metadata columns prematurely.
*/
object AddMetadataColumns extends Rule[LogicalPlan] {
- import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
- private def hasMetadataCol(plan: LogicalPlan): Boolean = {
- plan.expressions.exists(_.find {
- case a: Attribute => a.isMetadataCol
- case _ => false
- }.isDefined)
- }
-
- private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
- case r: DataSourceV2Relation => r.withMetadataColumns()
- case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
- }
+ import org.apache.spark.sql.catalyst.util._
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
+ // Add metadata output to all node types
case node if node.children.nonEmpty && node.resolved && hasMetadataCol(node) =>
val inputAttrs = AttributeSet(node.children.flatMap(_.output))
- val metaCols = node.expressions.flatMap(_.collect {
- case a: Attribute if a.isMetadataCol && !inputAttrs.contains(a) => a
- })
+ val metaCols = getMetadataAttributes(node).filterNot(inputAttrs.contains)
if (metaCols.isEmpty) {
node
} else {
val newNode = addMetadataCol(node)
- // We should not change the output schema of the plan. We should project away the extr
+ // We should not change the output schema of the plan. We should project away the extra
// metadata columns if necessary.
if (newNode.sameOutput(node)) {
newNode
@@ -960,6 +949,38 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
}
+
+ private def getMetadataAttributes(plan: LogicalPlan): Seq[Attribute] = {
+ plan.expressions.flatMap(_.collect {
+ case a: Attribute if a.isMetadataCol => a
+ case a: Attribute
+ if plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId)) =>
+ plan.children.collectFirst {
+ case c if c.metadataOutput.exists(_.exprId == a.exprId) =>
+ c.metadataOutput.find(_.exprId == a.exprId).get
+ }.get
+ })
+ }
+
+ private def hasMetadataCol(plan: LogicalPlan): Boolean = {
+ plan.expressions.exists(_.find {
+ case a: Attribute =>
+ // If an attribute is resolved before being labeled as metadata
+ // (i.e. from the originating Dataset), we check with expression ID
+ a.isMetadataCol ||
+ plan.children.exists(c => c.metadataOutput.exists(_.exprId == a.exprId))
+ case _ => false
+ }.isDefined)
+ }
+
+ private def addMetadataCol(plan: LogicalPlan): LogicalPlan = plan match {
+ case r: DataSourceV2Relation => r.withMetadataColumns()
+ case p: Project =>
+ p.copy(
+ projectList = p.metadataOutput ++ p.projectList,
+ child = addMetadataCol(p.child))
+ case _ => plan.withNewChildren(plan.children.map(addMetadataCol))
+ }
}
/**
@@ -1898,10 +1919,10 @@ class Analyzer(override val catalogManager: CatalogManager)
}
/**
- * This method tries to resolve expressions and find missing attributes recursively. Specially,
- * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved
- * attributes which are missed from child output. This method tries to find the missing
- * attributes out and add into the projection.
+ * This method tries to resolve expressions and find missing attributes recursively.
+ * Specifically, when the expressions used in `Sort` or `Filter` contain unresolved attributes
+ * or resolved attributes which are missing from child output. This method tries to find the
+ * missing attributes and add them into the projection.
*/
private def resolveExprsAndAddMissingAttrs(
exprs: Seq[Expression], plan: LogicalPlan): (Seq[Expression], LogicalPlan) = {
@@ -3150,7 +3171,9 @@ class Analyzer(override val catalogManager: CatalogManager)
joinType: JoinType,
joinNames: Seq[String],
condition: Option[Expression],
- hint: JoinHint) = {
+ hint: JoinHint): LogicalPlan = {
+ import org.apache.spark.sql.catalyst.util._
+
val leftKeys = joinNames.map { keyName =>
left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
throw QueryCompilationErrors.unresolvedUsingColForJoinError(keyName, left, "left")
@@ -3170,26 +3193,33 @@ class Analyzer(override val catalogManager: CatalogManager)
val rUniqueOutput = right.output.filterNot(att => rightKeys.contains(att))
// the output list looks like: join keys, columns from left, columns from right
- val projectList = joinType match {
+ val (projectList, hiddenList) = joinType match {
case LeftOuter =>
- leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true))
+ (leftKeys ++ lUniqueOutput ++ rUniqueOutput.map(_.withNullability(true)), rightKeys)
case LeftExistence(_) =>
- leftKeys ++ lUniqueOutput
+ (leftKeys ++ lUniqueOutput, Seq.empty)
case RightOuter =>
- rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput
+ (rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ rUniqueOutput, leftKeys)
case FullOuter =>
// in full outer join, joinCols should be non-null if there is.
val joinedCols = joinPairs.map { case (l, r) => Alias(Coalesce(Seq(l, r)), l.name)() }
- joinedCols ++
+ (joinedCols ++
lUniqueOutput.map(_.withNullability(true)) ++
- rUniqueOutput.map(_.withNullability(true))
+ rUniqueOutput.map(_.withNullability(true)),
+ leftKeys ++ rightKeys)
case _ : InnerLike =>
- leftKeys ++ lUniqueOutput ++ rUniqueOutput
+ (leftKeys ++ lUniqueOutput ++ rUniqueOutput, rightKeys)
case _ =>
sys.error("Unsupported natural join type " + joinType)
}
- // use Project to trim unnecessary fields
- Project(projectList, Join(left, right, joinType, newCondition, hint))
+ // use Project to hide duplicated common keys
+ // propagate hidden columns from nested USING/NATURAL JOINs
+ val project = Project(projectList, Join(left, right, joinType, newCondition, hint))
+ project.setTagValue(
+ Project.hiddenOutputTag,
+ hiddenList.map(_.markAsSupportsQualifiedStar()) ++
+ project.child.metadataOutput.filter(_.supportsQualifiedStar))
+ project
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 3b2f4ca..5001e2e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, UnaryNode}
-import org.apache.spark.sql.catalyst.util.quoteIfNeeded
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.types.{DataType, Metadata, StructType}
@@ -340,11 +340,11 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
* Returns true if the nameParts is a subset of the last elements of qualifier of the attribute.
*
* For example, the following should all return true:
- * - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", "ns2", "t") and
+ * - `SELECT ns1.ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns1", "ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
- * - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
+ * - `SELECT ns2.t.* FROM ns1.ns2.t` where nameParts is Seq("ns2", "t") and
* qualifier is Seq("ns1", "ns2", "t").
- * - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
+ * - `SELECT t.* FROM ns1.ns2.t` where nameParts is Seq("t") and
* qualifier is Seq("ns1", "ns2", "t").
*/
private def matchedQualifier(
@@ -366,10 +366,13 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
override def expand(
input: LogicalPlan,
resolver: Resolver): Seq[NamedExpression] = {
- // If there is no table specified, use all input attributes.
+ // If there is no table specified, use all non-hidden input attributes.
if (target.isEmpty) return input.output
- val expandedAttributes = input.output.filter(matchedQualifier(_, target.get, resolver))
+ // If there is a table specified, use hidden input attributes as well
+ val hiddenOutput = input.metadataOutput.filter(_.supportsQualifiedStar)
+ val expandedAttributes = (hiddenOutput ++ input.output).filter(
+ matchedQualifier(_, target.get, resolver))
if (expandedAttributes.nonEmpty) return expandedAttributes
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
index 5a888de..9e9bc69 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala
@@ -145,6 +145,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan =>
self.markRuleAsIneffective(ruleId)
self
} else {
+ afterRule.copyTagsFrom(self)
afterRule
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 6ebb1be..b31e930 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -25,17 +25,18 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.trees.TreePattern.{
INNER_LIKE_JOIN, JOIN, LEFT_SEMI_OR_ANTI_JOIN, NATURAL_LIKE_JOIN, OUTER_JOIN, TreePattern
}
-import org.apache.spark.sql.catalyst.util.truncatedString
+import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.random.RandomSampler
/**
- * When planning take() or collect() operations, this special node that is inserted at the top of
+ * When planning take() or collect() operations, this special node is inserted at the top of
* the logical plan before invoking the query planner.
*
* Rules can pattern-match on this node in order to apply transformations that only take effect
@@ -69,7 +70,6 @@ object Subquery {
case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
extends OrderPreservingUnaryNode {
override def output: Seq[Attribute] = projectList.map(_.toAttribute)
- override def metadataOutput: Seq[Attribute] = Nil
override def maxRows: Option[Long] = child.maxRows
override lazy val resolved: Boolean = {
@@ -86,10 +86,17 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan)
override lazy val validConstraints: ExpressionSet =
getAllValidConstraints(projectList)
+ override def metadataOutput: Seq[Attribute] =
+ getTagValue(Project.hiddenOutputTag).getOrElse(Nil)
+
override protected def withNewChildInternal(newChild: LogicalPlan): Project =
copy(child = newChild)
}
+object Project {
+ val hiddenOutputTag: TreeNodeTag[Seq[Attribute]] = TreeNodeTag[Seq[Attribute]]("hidden_output")
+}
+
/**
* Applies a [[Generator]] to a stream of input rows, combining the
* output of each into a new stream of rows. This operation is similar to a `flatMap` in functional
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 4f8fcd9..33fe48d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{NumericType, StringType}
+import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
@@ -201,4 +201,28 @@ package object util extends Logging {
def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = {
truncatedString(seq, "", sep, "", maxFields)
}
+
+ val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+ implicit class MetadataColumnHelper(attr: Attribute) {
+ /**
+ * If set, this metadata column is a candidate during qualified star expansions.
+ */
+ val SUPPORTS_QUALIFIED_STAR = "__supports_qualified_star"
+
+ def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
+ attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
+
+ def supportsQualifiedStar: Boolean = attr.isMetadataCol &&
+ attr.metadata.contains(SUPPORTS_QUALIFIED_STAR) &&
+ attr.metadata.getBoolean(SUPPORTS_QUALIFIED_STAR)
+
+ def markAsSupportsQualifiedStar(): Attribute = attr.withMetadata(
+ new MetadataBuilder()
+ .withMetadata(attr.metadata)
+ .putBoolean(METADATA_COL_ATTR_KEY, true)
+ .putBoolean(SUPPORTS_QUALIFIED_STAR, true)
+ .build()
+ )
+ }
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index c0d24e2..efd3ffe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -20,15 +20,14 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.util.METADATA_COL_ATTR_KEY
import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
object DataSourceV2Implicits {
- private val METADATA_COL_ATTR_KEY = "__metadata_col"
-
implicit class TableHelper(table: Table) {
def asReadable: SupportsRead = {
table match {
@@ -101,11 +100,6 @@ object DataSourceV2Implicits {
def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
}
- implicit class MetadataColumnHelper(attr: Attribute) {
- def isMetadataCol: Boolean = attr.metadata.contains(METADATA_COL_ATTR_KEY) &&
- attr.metadata.getBoolean(METADATA_COL_ATTR_KEY)
- }
-
implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
diff --git a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
index 71a5015..060f15e 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/natural-join.sql
@@ -10,6 +10,19 @@ create temporary view nt2 as select * from values
("one", 5)
as nt2(k, v2);
+create temporary view nt3 as select * from values
+ ("one", 4),
+ ("two", 5),
+ ("one", 6)
+ as nt3(k, v3);
+
+create temporary view nt4 as select * from values
+ ("one", 7),
+ ("two", 8),
+ ("one", 9)
+ as nt4(k, v4);
+
+SELECT * FROM nt1 natural join nt2;
SELECT * FROM nt1 natural join nt2 where k = "one";
@@ -18,3 +31,43 @@ SELECT * FROM nt1 natural left join nt2 order by v1, v2;
SELECT * FROM nt1 natural right join nt2 order by v1, v2;
SELECT count(*) FROM nt1 natural full outer join nt2;
+
+SELECT k FROM nt1 natural join nt2;
+
+SELECT k FROM nt1 natural join nt2 where k = "one";
+
+SELECT nt1.* FROM nt1 natural join nt2;
+
+SELECT nt2.* FROM nt1 natural join nt2;
+
+SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq;
+
+SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq;
+
+SELECT nt1.*, nt2.* FROM nt1 natural join nt2;
+
+SELECT *, nt2.k FROM nt1 natural join nt2;
+
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2;
+
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one";
+
+SELECT * FROM (SELECT * FROM nt1 natural join nt2);
+
+SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2);
+
+SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2);
+
+SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2);
+
+SELECT * FROM nt1 natural join nt2 natural join nt3;
+
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3;
+
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k;
+
+SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k;
+
+SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k;
+
+SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/using-join.sql b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
new file mode 100644
index 0000000..336d19f
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/using-join.sql
@@ -0,0 +1,70 @@
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1);
+
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5),
+ ("four", 4)
+ as nt2(k, v2);
+
+SELECT * FROM nt1 left outer join nt2 using (k);
+
+SELECT k FROM nt1 left outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 left outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 left outer join nt2 using (k);
+
+SELECT * FROM nt1 left semi join nt2 using (k);
+
+SELECT k FROM nt1 left semi join nt2 using (k);
+
+SELECT nt1.* FROM nt1 left semi join nt2 using (k);
+
+SELECT nt1.k FROM nt1 left semi join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 left semi join nt2 using (k);
+
+SELECT * FROM nt1 right outer join nt2 using (k);
+
+SELECT k FROM nt1 right outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 right outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 right outer join nt2 using (k);
+
+SELECT * FROM nt1 full outer join nt2 using (k);
+
+SELECT k FROM nt1 full outer join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 full outer join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 full outer join nt2 using (k);
+
+SELECT * FROM nt1 full outer join nt2 using (k);
+
+SELECT k FROM nt1 inner join nt2 using (k);
+
+SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k);
+
+SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k);
+
+SELECT k, nt1.k FROM nt1 inner join nt2 using (k);
+
+SELECT k, nt2.k FROM nt1 inner join nt2 using (k);
diff --git a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
index 13f3197..794e472 100644
--- a/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/natural-join.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 6
+-- Number of queries: 29
-- !query
@@ -27,6 +27,40 @@ struct<>
-- !query
+create temporary view nt3 as select * from values
+ ("one", 4),
+ ("two", 5),
+ ("one", 6)
+ as nt3(k, v3)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create temporary view nt4 as select * from values
+ ("one", 7),
+ ("two", 8),
+ ("one", 9)
+ as nt4(k, v4)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one 1 1
+one 1 5
+two 2 22
+
+
+-- !query
SELECT * FROM nt1 natural join nt2 where k = "one"
-- !query schema
struct<k:string,v1:int,v2:int>
@@ -62,3 +96,216 @@ SELECT count(*) FROM nt1 natural full outer join nt2
struct<count(1):bigint>
-- !query output
4
+
+
+-- !query
+SELECT k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT k FROM nt1 natural join nt2 where k = "one"
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+
+
+-- !query
+SELECT nt1.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one 1
+one 1
+two 2
+
+
+-- !query
+SELECT nt2.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v2:int>
+-- !query output
+one 1
+one 5
+two 22
+
+
+-- !query
+SELECT sbq.* from (SELECT * FROM nt1 natural join nt2) sbq
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one 1 1
+one 1 5
+two 2 22
+
+
+-- !query
+SELECT sbq.k from (SELECT * FROM nt1 natural join nt2) sbq
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query
+SELECT *, nt2.k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string>
+-- !query output
+one 1 1 one
+one 1 5 one
+two 2 22 two
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+two two
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 natural join nt2 where k = "one"
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+
+
+-- !query
+SELECT * FROM (SELECT * FROM nt1 natural join nt2)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one 1 1
+one 1 5
+two 2 22
+
+
+-- !query
+SELECT * FROM (SELECT nt1.*, nt2.* FROM nt1 natural join nt2)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query
+SELECT * FROM (SELECT nt1.v1, nt2.k FROM nt1 natural join nt2)
+-- !query schema
+struct<v1:int,k:string>
+-- !query output
+1 one
+1 one
+2 two
+
+
+-- !query
+SELECT nt2.k FROM (SELECT * FROM nt1 natural join nt2)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+cannot resolve 'nt2.k' given input columns: [__auto_generated_subquery_name.k, __auto_generated_subquery_name.v1, __auto_generated_subquery_name.v2]; line 1 pos 7
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 natural join nt3
+-- !query schema
+struct<k:string,v1:int,v2:int,v3:int>
+-- !query output
+one 1 1 4
+one 1 1 6
+one 1 5 4
+one 1 5 6
+two 2 22 5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 natural join nt3
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int>
+-- !query output
+one 1 one 1 one 4
+one 1 one 1 one 6
+one 1 one 5 one 4
+one 1 one 5 one 6
+two 2 two 22 two 5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.* FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int>
+-- !query output
+one 1 one 1 one 4
+one 1 one 1 one 6
+one 1 one 5 one 4
+one 1 one 5 one 6
+two 2 two 22 two 5
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 join nt3 on nt1.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string,v3:int>
+-- !query output
+one 1 1 one 4
+one 1 1 one 6
+one 1 5 one 4
+one 1 5 one 6
+two 2 22 two 5
+
+
+-- !query
+SELECT * FROM nt1 natural join nt2 join nt3 on nt2.k = nt3.k
+-- !query schema
+struct<k:string,v1:int,v2:int,k:string,v3:int>
+-- !query output
+one 1 1 one 4
+one 1 1 one 6
+one 1 5 one 4
+one 1 5 one 6
+two 2 22 two 5
+
+
+-- !query
+SELECT nt1.*, nt2.*, nt3.*, nt4.* FROM nt1 natural join nt2 natural join nt3 natural join nt4
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int,k:string,v3:int,k:string,v4:int>
+-- !query output
+one 1 one 1 one 4 one 7
+one 1 one 1 one 4 one 9
+one 1 one 1 one 6 one 7
+one 1 one 1 one 6 one 9
+one 1 one 5 one 4 one 7
+one 1 one 5 one 4 one 9
+one 1 one 5 one 6 one 7
+one 1 one 5 one 6 one 9
+two 2 two 22 two 5 two 8
diff --git a/sql/core/src/test/resources/sql-tests/results/using-join.sql.out b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out
new file mode 100644
index 0000000..1d2ae9d
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/using-join.sql.out
@@ -0,0 +1,338 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 31
+
+
+-- !query
+create temporary view nt1 as select * from values
+ ("one", 1),
+ ("two", 2),
+ ("three", 3)
+ as nt1(k, v1)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+create temporary view nt2 as select * from values
+ ("one", 1),
+ ("two", 22),
+ ("one", 5),
+ ("four", 4)
+ as nt2(k, v2)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT * FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+one 1 1
+one 1 5
+three 3 NULL
+two 2 22
+
+
+-- !query
+SELECT k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+three
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one 1 one 1
+one 1 one 5
+three 3 NULL NULL
+two 2 two 22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+three NULL
+two two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+three three
+two two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 left outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+three NULL
+two two
+
+
+-- !query
+SELECT * FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one 1
+two 2
+
+
+-- !query
+SELECT k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+two
+
+
+-- !query
+SELECT nt1.* FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int>
+-- !query output
+one 1
+two 2
+
+
+-- !query
+SELECT nt1.k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 left semi join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+two two
+
+
+-- !query
+SELECT * FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four NULL 4
+one 1 1
+one 1 5
+two 2 22
+
+
+-- !query
+SELECT k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+four
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+NULL NULL four 4
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+NULL four
+one one
+one one
+two two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four NULL
+one one
+one one
+two two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 right outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four four
+one one
+one one
+two two
+
+
+-- !query
+SELECT * FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four NULL 4
+one 1 1
+one 1 5
+three 3 NULL
+two 2 22
+
+
+-- !query
+SELECT k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+four
+one
+one
+three
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+NULL NULL four 4
+one 1 one 1
+one 1 one 5
+three 3 NULL NULL
+two 2 two 22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+NULL four
+one one
+one one
+three NULL
+two two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four NULL
+one one
+one one
+three three
+two two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+four four
+one one
+one one
+three NULL
+two two
+
+
+-- !query
+SELECT * FROM nt1 full outer join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,v2:int>
+-- !query output
+four NULL 4
+one 1 1
+one 1 5
+three 3 NULL
+two 2 22
+
+
+-- !query
+SELECT k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string>
+-- !query output
+one
+one
+two
+
+
+-- !query
+SELECT nt1.*, nt2.* FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,v1:int,k:string,v2:int>
+-- !query output
+one 1 one 1
+one 1 one 5
+two 2 two 22
+
+
+-- !query
+SELECT nt1.k, nt2.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+two two
+
+
+-- !query
+SELECT k, nt1.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+two two
+
+
+-- !query
+SELECT k, nt2.k FROM nt1 inner join nt2 using (k)
+-- !query schema
+struct<k:string,k:string>
+-- !query output
+one one
+one one
+two two
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index c2555a1..a803fa8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -477,4 +477,26 @@ class DataFrameJoinSuite extends QueryTest
checkAnswer(df3.except(df4), Row(10, 50, 2, Row(10, 50, 2)))
}
+
+ test("SPARK-34527: Resolve common columns from USING JOIN") {
+ val joinDf = testData2.as("testData2").join(
+ testData3.as("testData3"), usingColumns = Seq("a"), joinType = "fullouter")
+ val dfQuery = joinDf.select(
+ $"a", $"testData2.a", $"testData2.b", $"testData3.a", $"testData3.b")
+ val dfQuery2 = joinDf.select(
+ $"a", testData2.col("a"), testData2.col("b"), testData3.col("a"), testData3.col("b"))
+
+ Seq(dfQuery, dfQuery2).map { query =>
+ checkAnswer(query,
+ Seq(
+ Row(1, 1, 1, 1, null),
+ Row(1, 1, 2, 1, null),
+ Row(2, 2, 1, 2, 2),
+ Row(2, 2, 2, 2, 2),
+ Row(3, 3, 1, null, null),
+ Row(3, 3, 2, null, null)
+ )
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org