You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "SAVIO SALVARINO TELES DE OLIVEIRA (Jira)" <ji...@apache.org> on 2022/06/26 13:26:00 UTC

[jira] [Commented] (SPARK-38288) Aggregate push down doesnt work using Spark SQL jdbc datasource with postgresql

    [ https://issues.apache.org/jira/browse/SPARK-38288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17558879#comment-17558879 ] 

SAVIO SALVARINO TELES DE OLIVEIRA commented on SPARK-38288:
-----------------------------------------------------------

I have the same problem with Spark 3.2.1.

Code to read orders table from PostgreSQL:

 
{code:java}
orders = spark. \
            read. \
            format('jdbc'). \
            option('url', 'jdbc:postgresql://...'). \
            option('driver', 'org.postgresql.Driver'). \
            option('dbtable', 'orders'). \
            option('user', '***'). \
            option('password', '****'). \
            option('pushDownAggregate', 'true'). \
            load(){code}
 

Now, I'm trying to group by two columns (owner_name and client_id):
{code:java}
orders.groupby("owner_name", "client_id").agg(max('order_date').alias("max_order_date")).limit(10).explain('extended'){code}
 

But the query execution is still using the Relation instead RelationV2:

 
{code:java}
== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Aggregate [owner_name#2717, client_id#2724], [owner_name#2717, client_id#2724, max(order_date#2728) AS max_order_date#2910]
      +- Project [owner_name#2717, client_id#2724, order_date#2728]
         +- Relation [owner_name#2717,client_id#2724,order_date#2728] JDBCRelation(orders) [numPartitions=1]{code}
 

> Aggregate push down doesnt work using Spark SQL jdbc datasource with postgresql
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-38288
>                 URL: https://issues.apache.org/jira/browse/SPARK-38288
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Luis Lozano Coira
>            Priority: Major
>              Labels: DataSource, Spark-SQL
>
> I am establishing a connection with postgresql using the Spark SQL jdbc datasource. I have started the spark shell including the postgres driver and I can connect and execute queries without problems. I am using this statement:
> {code:java}
> val df = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/").option("driver", "org.postgresql.Driver").option("dbtable", "test").option("user", "postgres").option("password", "*******").option("pushDownAggregate",true).load()
> {code}
> I am adding the pushDownAggregate option because I would like the aggregations are delegated to the source. But for some reason this is not happening.
> Reviewing this pull request, it seems that this feature should be merged into 3.2. [https://github.com/apache/spark/pull/29695]
> I am making the aggregations considering the mentioned limitations. An example case where I don't see pushdown being done would be this one:
> {code:java}
> df.groupBy("name").max("age").show()
> {code}
> The results of the queryExecution are shown below:
> {code:java}
> scala> df.groupBy("name").max("age").queryExecution.executedPlan
> res19: org.apache.spark.sql.execution.SparkPlan =
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, max(age)#544])
>    +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
>       +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], output=[name#274, max#548])
>          +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<age:int,name:string>
> scala> dfp.groupBy("name").max("age").queryExecution.toString
> res20: String =
> "== Parsed Logical Plan ==
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
> == Analyzed Logical Plan ==
> name: string, max(age): int
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#24...
> {code}
> What could be the problem? Should pushDownAggregate work in this case?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org