You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2020/02/23 08:48:56 UTC

[spark] branch branch-3.0 updated: [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f30f50a  [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table
f30f50a is described below

commit f30f50a76f4b9fb5e652620563fb9055c5f30521
Author: yi.wu <yi...@databricks.com>
AuthorDate: Sun Feb 23 17:46:19 2020 +0900

    [SPARK-30844][SQL] Static partition should also follow StoreAssignmentPolicy when insert into table
    
    ### What changes were proposed in this pull request?
    
    Make static partition also follows `StoreAssignmentPolicy` when insert into table:
    
    if `StoreAssignmentPolicy=LEGACY`, using `Cast`;
    if `StoreAssignmentPolicy=ANSI | STRIC`, using `AnsiCast`;
    
    E.g., for the table `t` created by:
    
    ```
    create table t(a int, b string) using parquet partitioned by (a)
    ```
    and insert values with `StoreAssignmentPolicy=ANSI` using:
    ```
    insert into t partition(a='ansi') values('ansi')
    ```
    
    Before this PR:
    
    ```
    +----+----+
    |   b|   a|
    +----+----+
    |ansi|null|
    +----+----+
    ```
    
    After this PR, insert will fail by:
    ```
    java.lang.NumberFormatException: invalid input syntax for type numeric: ansi
    ```
    
    (It should be better if we could use `TableOutputResolver.checkField` to fully follow `StoreAssignmentPolicy`. But since we lost the data type of static partition's value at first place, it's hard to use `TableOutputResolver.checkField`.)
    
    ### Why are the changes needed?
    
    I think we should follow `StoreAssignmentPolicy` when insert into table for any columns, including static partition.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new test.
    
    Closes #27597 from Ngone51/fix-static-partition.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
    (cherry picked from commit 9c2eadc7268844d49ec41da818002c99bb56addf)
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../execution/datasources/DataSourceStrategy.scala  | 13 ++++++++++++-
 .../spark/sql/sources/DataSourceAnalysisSuite.scala | 10 ++++++++--
 .../org/apache/spark/sql/sources/InsertSuite.scala  | 21 +++++++++++++++++++++
 3 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e3a0a0a..2d902b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -104,7 +105,17 @@ case class DataSourceAnalysis(conf: SQLConf) extends Rule[LogicalPlan] with Cast
         None
       } else if (potentialSpecs.size == 1) {
         val partValue = potentialSpecs.head._2
-        Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
+        conf.storeAssignmentPolicy match {
+          // SPARK-30844: try our best to follow StoreAssignmentPolicy for static partition
+          // values but not completely follow because we can't do static type checking due to
+          // the reason that the parser has erased the type info of static partition values
+          // and converted them to string.
+          case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
+            Some(Alias(AnsiCast(Literal(partValue), field.dataType,
+              Option(conf.sessionLocalTimeZone)), field.name)())
+          case _ =>
+            Some(Alias(cast(Literal(partValue), field.dataType), field.name)())
+        }
       } else {
         throw new AnalysisException(
           s"Partition column ${field.name} have multiple values specified, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
index e1022e3..a6c5090 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceAnalysisSuite.scala
@@ -22,9 +22,10 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, Attribute, Cast, Expression, Literal}
 import org.apache.spark.sql.execution.datasources.DataSourceAnalysis
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
 
 class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
@@ -52,7 +53,12 @@ class DataSourceAnalysisSuite extends SparkFunSuite with BeforeAndAfterAll {
   Seq(true, false).foreach { caseSensitive =>
     val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)
     def cast(e: Expression, dt: DataType): Expression = {
-      Cast(e, dt, Option(conf.sessionLocalTimeZone))
+      conf.storeAssignmentPolicy match {
+        case StoreAssignmentPolicy.ANSI | StoreAssignmentPolicy.STRICT =>
+          AnsiCast(e, dt, Option(conf.sessionLocalTimeZone))
+        case _ =>
+          Cast(e, dt, Option(conf.sessionLocalTimeZone))
+      }
     }
     val rule = DataSourceAnalysis(conf)
     test(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index bcff30a..2d66637 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -753,6 +753,27 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
   }
 
+  test("SPARK-30844: static partition should also follow StoreAssignmentPolicy") {
+    SQLConf.StoreAssignmentPolicy.values.foreach { policy =>
+      withSQLConf(
+        SQLConf.STORE_ASSIGNMENT_POLICY.key -> policy.toString) {
+        withTable("t") {
+          sql("create table t(a int, b string) using parquet partitioned by (a)")
+          policy match {
+            case SQLConf.StoreAssignmentPolicy.ANSI | SQLConf.StoreAssignmentPolicy.STRICT =>
+              val errorMsg = intercept[NumberFormatException] {
+                sql("insert into t partition(a='ansi') values('ansi')")
+              }.getMessage
+              assert(errorMsg.contains("invalid input syntax for type numeric: ansi"))
+            case SQLConf.StoreAssignmentPolicy.LEGACY =>
+              sql("insert into t partition(a='ansi') values('ansi')")
+              checkAnswer(sql("select * from t"), Row("ansi", null) :: Nil)
+          }
+        }
+      }
+    }
+  }
+
   test("SPARK-24860: dynamic partition overwrite specified per source without catalog table") {
     withTempPath { path =>
       Seq((1, 1), (2, 2)).toDF("i", "part")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org