You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/02/23 08:48:56 UTC
[spark] branch branch-3.0 updated: [SPARK-30844][SQL] Static
partition should also follow StoreAssignmentPolicy when insert into table
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f30f50a [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table
f30f50a is described below
commit f30f50a76f4b9fb5e652620563fb9055c5f30521
Author: yi.wu <yi...@databricks.com>
AuthorDate: Sun Feb 23 17:46:19 2020 +0900
[SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table
### What changes were proposed in this pull request?
Make static partition also follows `StoreAssignmentPolicy` when insert into table:
if `StoreAssignmentPolicy=LEGACY`, using `Cast`;
if `StoreAssignmentPolicy=ANSI | STRIC`, using `AnsiCast`;
E.g., for the table `t` created by:
```
create table t(a int, b string) using parquet partitioned by (a)
```
and insert values with `StoreAssignmentPolicy=ANSI` using:
```
insert into t partition(a='ansi') values('ansi')
```
Before this PR:
```
+----+----+
| b| a|
+----+----+
|ansi|null|
+----+----+
```
After this PR, insert will fail by:
```
java.lang.NumberFormatException: invalid input syntax for type numeric: ansi
```
(It should be better if we could use `TableOutputResolver.checkField` to fully follow `StoreAssignmentPolicy`. But since we lost the data type of static partition's value at first place, it's hard to use `TableOutputResolver.checkField`.)
### Why are the changes needed?
I think we should follow `StoreAssignmentPolicy` when insert into table for any columns, including static partition.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Added new test.
Closes #27597 from Ngone51/fix-static-partition.
Authored-by: yi.wu <yi...@databricks.com>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
(cherry picked from commit 9c2eadc7268844d49ec41da818002c99bb56addf)
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../execution/datasources/DataSourceStrategy.scala | 13 ++++++++++++-
.../spark/sql/sources/DataSourceAnalysisSuite.scala | 10 ++++++++--
.../org/apache/spark/sql/sources/InsertSuite.scala | 21 +++++++++++++++++++++
3 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e3a0a0a..2d902b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -104,7 +105,17 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
None
} else if (potentialSpecs.size == 1) {
val partValue = potentialSpecs.head._2
- Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
+ conf.storeAssignmentPolicy match {
+ // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
+ // values but not completely follow because we can't do static type checking due to
+ // the reason that the parser has erased the type info of static partition values
+ // and converted them to string.
+ case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
+ Some(Alias(AnsiCast(Literal(partValue), field.dataType,
+ Option(conf.sessionLocalTimeZone)), field.name)())
+ case _ =>
+ Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
+ }
} else {
throw new AnalysisException(
s"Partition column ${field.name} have multiple values specified, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
index e1022e3..a6c5090 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
@@ -22,9 +22,10 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.execution.datasources.DataSourceAnalysis
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
@@ -52,7 +53,12 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
Seq(true, false).foreach { caseSensitive =>
val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)
def cast(e: Expression, dt: DataType): Expression = {
- Cast(e, dt, Option(conf.sessionLocalTimeZone))
+ conf.storeAssignmentPolicy match {
+ case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
+ AnsiCast(e, dt, Option(conf.sessionLocalTimeZone))
+ case _ =>
+ Cast(e, dt, Option(conf.sessionLocalTimeZone))
+ }
}
val rule = DataSourceAnalysis(conf)
test(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index bcff30a..2d66637 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -753,6 +753,27 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
}
+ test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") {
+ SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
+ withSQLConf(
+ SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
+ withTable("t") {
+ sql("create table t(a int, b string) using parquet partitioned by (a)")
+ policy match {
+ case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
+ val errorMsg = intercept[NumberFormatException] {
+ sql("insert into t partition(a='ansi') values('ansi')")
+ }.getMessage
+ assert(errorMsg.contains("invalid input syntax for type numeric: ansi"))
+ case SQLConf.StoreAssignmentPolicy.LEGACY =>
+ sql("insert into t partition(a='ansi') values('ansi')")
+ checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
+ }
+ }
+ }
+ }
+ }
+
test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") {
withTempPath { path =>
Seq((1, 1), (2, 2)).toDF("i", "part")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org