You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2023/10/23 17:52:10 UTC
[spark] branch master updated: [SPARK-44735][SQL] Add warning msg when inserting columns with the same name by row that don't match up
This is an automated email from the ASF dual-hosted git repository.
holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ae100267c28b [SPARK-44735][SQL] Add warning msg when inserting columns with the same name by row that don't match up
ae100267c28b is described below
commit ae100267c28bc6fd2c2f9c880ed3df1999423992
Author: Jia Fan <fa...@qq.com>
AuthorDate: Mon Oct 23 10:51:31 2023 -0700
[SPARK-44735][SQL] Add warning msg when inserting columns with the same name by row that don't match up
### What changes were proposed in this pull request?
This PR add a warning msg when inserting columns name with the same name by row but order not matched. Tell user can use `INSERT INTO BY NAME` to reorder columns to match with table schema.
It will be like:
![image](https://github.com/apache/spark/assets/32387433/18e57125-8a2e-407c-a3fd-93a9cbf122a1)
### Why are the changes needed?
Optimize user usage scenarios.
### Does this PR introduce _any_ user-facing change?
Yes, sometimes will show some warning.
### How was this patch tested?
Test in local
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42763 from Hisoka-X/SPARK-44735_add_warning_for_by_name.
Authored-by: Jia Fan <fa...@qq.com>
Signed-off-by: Holden Karau <ho...@pigscanfly.ca>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 ++
.../sql/catalyst/analysis/TableOutputResolver.scala | 17 ++++++++++++++++-
.../apache/spark/sql/execution/datasources/rules.scala | 5 ++++-
3 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0469fb29a6fc..06d949ece262 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3421,6 +3421,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case v2Write: V2WriteCommand
if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved =>
validateStoreAssignmentPolicy()
+ TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
+ expected = v2Write.table.output, queryOutput = v2Write.query.output)
val projection = TableOutputResolver.resolveOutputColumns(
v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf)
if (projection != v2Write.query) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index d41757725771..1398552399cd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -34,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralType, MapType, StructType}
-object TableOutputResolver {
+object TableOutputResolver extends SQLConfHelper with Logging {
def resolveVariableOutputColumns(
expected: Seq[VariableReference],
@@ -470,6 +472,19 @@ object TableOutputResolver {
}
}
+ def suitableForByNameCheck(
+ byName: Boolean,
+ expected: Seq[Attribute],
+ queryOutput: Seq[Attribute]): Unit = {
+ if (!byName && expected.size == queryOutput.size &&
+ expected.forall(e => queryOutput.exists(p => conf.resolver(p.name, e.name))) &&
+ expected.zip(queryOutput).exists(e => !conf.resolver(e._1.name, e._2.name))) {
+ logWarning("The query columns and the table columns have same names but different " +
+ "orders. You can use INSERT [INTO | OVERWRITE] BY NAME to reorder the query columns to " +
+ "align with the table columns.")
+ }
+ }
+
private def containsIntegralOrDecimalType(dt: DataType): Boolean = dt match {
case _: IntegralType | _: DecimalType => true
case a: ArrayType => containsIntegralOrDecimalType(a.elementType)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index c5e86ee2d03e..65ebbb57fd32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -404,11 +404,14 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
insert.query
}
val newQuery = try {
+ val byName = hasColumnList || insert.byName
+ TableOutputResolver.suitableForByNameCheck(byName, expected = expectedColumns,
+ queryOutput = query.output)
TableOutputResolver.resolveOutputColumns(
tblName,
expectedColumns,
query,
- byName = hasColumnList || insert.byName,
+ byName,
conf,
supportColDefaultValue = true)
} catch {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org