You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "lianjunzhi (Jira)" <ji...@apache.org> on 2021/04/08 05:46:00 UTC

[jira] [Updated] (SPARK-34985) Different execution plans under jdbc and hdfs

     [ https://issues.apache.org/jira/browse/SPARK-34985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

lianjunzhi updated SPARK-34985:
-------------------------------
    Description: 
Hive has two non-partitioned tables, trade_order and trade_order_goods. Trade_order contains four fields: trade_id, company_id, is_delete, and trade_status. trade_order_goods contains four fields: trade_id, cost, is_delete, and sell_total. Run the following code snippets:

 
|val df = spark.sql(|
|"""|
|select|
|b.company_id,|
|sum(a.cost) cost|
|FROM oms.trade_order_goods a|
|JOIN oms.trade_order b|
|ON a.trade_id = b.trade_id|
|WHERE a.is_delete = 0 AND b.is_delete = 0|
|GROUP BY|
|b.company_id|
|""".stripMargin)|
{quote}df.explain() //Physical Plan 1
{quote}
{quote}df.write.insertInto("oms.test") //Physical Plan 2
{quote}
{quote}df.write
 .format("jdbc")
 .option("url", "")
 .option("dbtable", "test")
 .option("user", "")
 .option("password", "")
 .option("driver", "com.mysql.jdbc.Driver")
 .option("truncate", value = true)
 .option("batchsize", 15000)
 .mode(SaveMode.Append)
 .save() //Physical Plan 3
{quote}
Physical Plan 1:
{quote}AdaptiveSparkPlan isFinalPlan=false
 +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)])
 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40]
 +- HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)])
 +- Project [cost#2, company_id#6L|#2, company_id#6L]
 +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
 :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32]
 : +- Project [trade_id#1L, cost#2|#1L, cost#2]
 : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
 : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
 +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33]
 +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
 +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
 +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
{quote}
Physical Plan 2:
{quote}+- AdaptiveSparkPlan isFinalPlan=true
 +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], output=[company_id#6L, cost#28|#6L, cost#28])
 +- CustomShuffleReader coalesced
 +- ShuffleQueryStage 2
 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244]
 +- *(5) HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, sum#21])
 +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L]
 +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
 :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
 : +- CustomShuffleReader coalesced
 : +- ShuffleQueryStage 0
 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119]
 : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2]
 : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
 : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
 +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
 +- CustomShuffleReader coalesced
 +- ShuffleQueryStage 1
 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126]
 +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
 +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
 +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
{quote}
Physical Plan 3:
{quote}Execute SaveIntoDataSourceCommand
 +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> *********(redacted)), Append
 +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, sum(cost#2) AS cost#0]
 +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0))
 +- Join Inner, (trade_id#1L = trade_id#5L)
 :- SubqueryAlias a
 : +- SubqueryAlias spark_catalog.oms.trade_order_goods
 : +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] parquet
 +- SubqueryAlias b
 +- SubqueryAlias spark_catalog.oms.trade_order
 +- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] parquet
{quote}
As you can see, Physical Plan 3 does not have column pruning and predicate pushdown.

  was:
Hive has two non-partitioned tables, trade_order and trade_order_goods. Trade_order contains four fields: trade_id, company_id, is_delete, and trade_status. trade_order_goods contains four fields: trade_id, cost, is_delete, and sell_total. Run the following code snippets:
{quote} 
|val df = spark.sql(|
|"""|
|select|
|b.company_id,|
|sum(a.cost) cost|
|FROM oms.trade_order_goods a|
|JOIN oms.trade_order b|
|ON a.trade_id = b.trade_id|
|WHERE a.is_delete = 0 AND b.is_delete = 0|
|GROUP BY|
|b.company_id|
|""".stripMargin)|
{quote}
{quote}df.explain() //Physical Plan 1
{quote}
{quote}df.write.insertInto("oms.test") //Physical Plan 2
{quote}
{quote}df.write
 .format("jdbc")
 .option("url", "")
 .option("dbtable", "test")
 .option("user", "")
 .option("password", "")
 .option("driver", "com.mysql.jdbc.Driver")
 .option("truncate", value = true)
 .option("batchsize", 15000)
 .mode(SaveMode.Append)
 .save() //Physical Plan 3
{quote}
Physical Plan 1:
{quote}AdaptiveSparkPlan isFinalPlan=false
 +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)])
 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40]
 +- HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)])
 +- Project [cost#2, company_id#6L|#2, company_id#6L]
 +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
 :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32]
 : +- Project [trade_id#1L, cost#2|#1L, cost#2]
 : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
 : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
 +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33]
 +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
 +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
 +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
{quote}
Physical Plan 2:
{quote}+- AdaptiveSparkPlan isFinalPlan=true
 +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], output=[company_id#6L, cost#28|#6L, cost#28])
 +- CustomShuffleReader coalesced
 +- ShuffleQueryStage 2
 +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244]
 +- *(5) HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, sum#21])
 +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L]
 +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
 :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
 : +- CustomShuffleReader coalesced
 : +- ShuffleQueryStage 0
 : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119]
 : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2]
 : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
 : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
 +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
 +- CustomShuffleReader coalesced
 +- ShuffleQueryStage 1
 +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126]
 +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
 +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
 +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
{quote}
Physical Plan 3:
{quote}Execute SaveIntoDataSourceCommand
 +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> *********(redacted)), Append
 +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, sum(cost#2) AS cost#0]
 +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0))
 +- Join Inner, (trade_id#1L = trade_id#5L)
 :- SubqueryAlias a
 : +- SubqueryAlias spark_catalog.oms.trade_order_goods
 : +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] parquet
 +- SubqueryAlias b
 +- SubqueryAlias spark_catalog.oms.trade_order
 +- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] parquet
{quote}
As you can see, Physical Plan 3 does not have column pruning and predicate pushdown.


> Different execution plans under jdbc and hdfs
> ---------------------------------------------
>
>                 Key: SPARK-34985
>                 URL: https://issues.apache.org/jira/browse/SPARK-34985
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.1
>         Environment: spark 3.0.1
> hive 2.1.1-cdh6.2.0
> hadoop 3.0.0-cdh6.2.0
>  
>            Reporter: lianjunzhi
>            Priority: Major
>
> Hive has two non-partitioned tables, trade_order and trade_order_goods. Trade_order contains four fields: trade_id, company_id, is_delete, and trade_status. trade_order_goods contains four fields: trade_id, cost, is_delete, and sell_total. Run the following code snippets:
>  
> |val df = spark.sql(|
> |"""|
> |select|
> |b.company_id,|
> |sum(a.cost) cost|
> |FROM oms.trade_order_goods a|
> |JOIN oms.trade_order b|
> |ON a.trade_id = b.trade_id|
> |WHERE a.is_delete = 0 AND b.is_delete = 0|
> |GROUP BY|
> |b.company_id|
> |""".stripMargin)|
> {quote}df.explain() //Physical Plan 1
> {quote}
> {quote}df.write.insertInto("oms.test") //Physical Plan 2
> {quote}
> {quote}df.write
>  .format("jdbc")
>  .option("url", "")
>  .option("dbtable", "test")
>  .option("user", "")
>  .option("password", "")
>  .option("driver", "com.mysql.jdbc.Driver")
>  .option("truncate", value = true)
>  .option("batchsize", 15000)
>  .mode(SaveMode.Append)
>  .save() //Physical Plan 3
> {quote}
> Physical Plan 1:
> {quote}AdaptiveSparkPlan isFinalPlan=false
>  +- HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)])
>  +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#40|#40]
>  +- HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)])
>  +- Project [cost#2, company_id#6L|#2, company_id#6L]
>  +- SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
>  :- Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
>  : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#32|#32]
>  : +- Project [trade_id#1L, cost#2|#1L, cost#2]
>  : +- Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
>  : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
>  +- Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
>  +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#33|#33]
>  +- Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
>  +- Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
>  +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
> {quote}
> Physical Plan 2:
> {quote}+- AdaptiveSparkPlan isFinalPlan=true
>  +- *(6) HashAggregate(keys=[company_id#6L|#6L], functions=[sum(cost#2)|#2)], output=[company_id#6L, cost#28|#6L, cost#28])
>  +- CustomShuffleReader coalesced
>  +- ShuffleQueryStage 2
>  +- Exchange hashpartitioning(company_id#6L, 6), true, [id=#244|#244]
>  +- *(5) HashAggregate(keys=[company_id#6L|#6L], functions=[partial_sum(cost#2)|#2)], output=[company_id#6L, sum#21|#6L, sum#21])
>  +- *(5) Project [cost#2, company_id#6L|#2, company_id#6L]
>  +- *(5) SortMergeJoin [trade_id#1L|#1L], [trade_id#5L|#5L], Inner
>  :- *(3) Sort [trade_id#1L ASC NULLS FIRST|#1L ASC NULLS FIRST], false, 0
>  : +- CustomShuffleReader coalesced
>  : +- ShuffleQueryStage 0
>  : +- Exchange hashpartitioning(trade_id#1L, 6), true, [id=#119|#119]
>  : +- *(1) Project [trade_id#1L, cost#2|#1L, cost#2]
>  : +- *(1) Filter ((isnotnull(is_delete#3) AND (is_delete#3 = 0)) AND isnotnull(trade_id#1L))
>  : +- FileScan parquet oms.trade_order_goods[trade_id#1L,cost#2,is_delete#3|#1L,cost#2,is_delete#3] Batched: false, DataFilters: [isnotnull(is_delete#3), (is_delete#3 = 0), isnotnull(trade_id#1L)|#3), (is_delete#3 = 0), isnotnull(trade_id#1L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order_goods], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,cost:decimal(18,2),is_delete:int>
>  +- *(4) Sort [trade_id#5L ASC NULLS FIRST|#5L ASC NULLS FIRST], false, 0
>  +- CustomShuffleReader coalesced
>  +- ShuffleQueryStage 1
>  +- Exchange hashpartitioning(trade_id#5L, 6), true, [id=#126|#126]
>  +- *(2) Project [trade_id#5L, company_id#6L|#5L, company_id#6L]
>  +- *(2) Filter ((isnotnull(is_delete#7) AND (is_delete#7 = 0)) AND isnotnull(trade_id#5L))
>  +- FileScan parquet oms.trade_order[trade_id#5L,company_id#6L,is_delete#7|#5L,company_id#6L,is_delete#7] Batched: false, DataFilters: [isnotnull(is_delete#7), (is_delete#7 = 0), isnotnull(trade_id#5L)|#7), (is_delete#7 = 0), isnotnull(trade_id#5L)], Format: Parquet, Location: InMemoryFileIndex[hdfs://nameservice1/user/hive/warehouse/oms.db/trade_order], PartitionFilters: [], PushedFilters: [IsNotNull(is_delete), EqualTo(is_delete,0), IsNotNull(trade_id)], ReadSchema: struct<trade_id:bigint,company_id:bigint,is_delete:int>
> {quote}
> Physical Plan 3:
> {quote}Execute SaveIntoDataSourceCommand
>  +- SaveIntoDataSourceCommand org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider@64ee110b, Map(url -> *********(redacted), truncate -> true, batchsize -> 15000, driver -> com.mysql.jdbc.Driver, dbtable -> test, user -> jkyun, password -> *********(redacted)), Append
>  +- Aggregate [company_id#6L|#6L], [company_id#6L, sum(cost#2) AS cost#0|#6L, sum(cost#2) AS cost#0]
>  +- Filter ((is_delete#3 = 0) AND (is_delete#7 = 0))
>  +- Join Inner, (trade_id#1L = trade_id#5L)
>  :- SubqueryAlias a
>  : +- SubqueryAlias spark_catalog.oms.trade_order_goods
>  : +- Relation[trade_id#1L,cost#2,is_delete#3,sell_total#4|#1L,cost#2,is_delete#3,sell_total#4] parquet
>  +- SubqueryAlias b
>  +- SubqueryAlias spark_catalog.oms.trade_order
>  +- Relation[trade_id#5L,company_id#6L,is_delete#7,trade_status#8|#5L,company_id#6L,is_delete#7,trade_status#8] parquet
> {quote}
> As you can see, Physical Plan 3 does not have column pruning and predicate pushdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org