You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/15 03:45:13 UTC

[GitHub] [spark] LantaoJin commented on issue #25840: [SPARK-29166][SQL] Add parameters to limit the number of dynamic partitions for data source table

LantaoJin commented on issue #25840: [SPARK-29166][SQL] Add parameters to limit the number of dynamic partitions for data source table
URL: https://github.com/apache/spark/pull/25840#issuecomment-542023092
 
 
   > They should also work for data source tables as we store partitions in hive metastore.
   
   Base on below testing, it seems not work at all. @cloud-fan 
   ```scala
     test("SPARK-29166: dynamic partition on data source table overwrite with hive conf") {
       withSQLConf(
         SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
         withTable("dynamic_partition", "dynamic_partition_bucket") {
           sql(
             """
               |create table dynamic_partition(i int, part1 int, part2 int) using parquet
               |partitioned by (part1, part2)
             """.stripMargin)
   
           // no restriction
           sql("insert overwrite table dynamic_partition partition(part1=2, part2)" +
             " select 2, explode(array(2, 3, 4, 5))")
           checkAnswer(spark.table("dynamic_partition"),
             Seq(Row(2, 2, 2), Row(2, 2, 3), Row(2, 2, 4), Row(2, 2, 5)))
   
           // task level restriction
           withSQLConf("spark.hive.exec.max.dynamic.partitions.pernode" -> "2") {
   //          sql("set spark.hive.exec.max.dynamic.partitions.pernode=2")
             val e1 = intercept[Exception] {
               sql("insert overwrite table dynamic_partition partition(part1=2, part2)" +
                 " select 2, explode(array(2, 3, 4, 5))")
             }
   //          checkAnswer(spark.table("dynamic_partition"),
   //            Seq(Row(2, 2, 2), Row(2, 2, 3), Row(2, 2, 4), Row(2, 2, 5)))
           }
   
           // total partitions restriction
           withSQLConf("spark.hive.exec.max.dynamic.partitions" -> "3") {
   //          sql("set spark.hive.exec.max.dynamic.partitions=3")
             val e2 = intercept[Exception] {
               sql("insert overwrite table dynamic_partition partition(part1=2, part2)" +
                 " select 2, explode(array(2, 3, 4, 5))")
             }
   //          checkAnswer(spark.table("dynamic_partition"),
   //            Seq(Row(2, 2, 2), Row(2, 2, 3), Row(2, 2, 4), Row(2, 2, 5)))
           }
   
           // total partitions restriction with multiple tasks
           withSQLConf("spark.hive.exec.max.dynamic.partitions" -> "3") {
   //          sql("set spark.hive.exec.max.dynamic.partitions=3")
             val e3 = intercept[Exception] {
               sql("insert overwrite table dynamic_partition partition(part1=2, part2)" +
                 " select 2, explode(array(2, 3, 4, 5)) as id distribute by id")
             }
   //          checkAnswer(spark.table("dynamic_partition"),
   //            Seq(Row(2, 2, 2), Row(2, 2, 3), Row(2, 2, 4), Row(2, 2, 5)))
           }
   
           // total files restriction
           withSQLConf("spark.hive.exec.max.dynamic.partitions" -> "3",
             "spark.hive.exec.max.created.files" -> "3") {
   //          sql("set spark.hive.exec.max.dynamic.partitions=3")
   //          sql("set spark.hive.exec.max.created.files=3")
             sql(
               """
                 |create table dynamic_partition_bucket(i int, part1 int, part2 int) using parquet
                 |partitioned by (part1, part2)
                 |clustered by (i) into 3 buckets
               """.stripMargin)
             val e4 = intercept[Exception] {
               // we use same partition (part1=2/part2=2) to make total partitions less then 3
               sql("insert overwrite table dynamic_partition_bucket partition(part1=2, part2)" +
                 "select cast(substring(rand(), 3, 4) as int) as id, " +
                 "explode(array(2, 2, 2, 2, 2, 2)) distribute by id")
             }
           }
         }
       }
     }
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org