You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Luis Lozano Coira (Jira)" <ji...@apache.org> on 2022/02/22 12:52:00 UTC

[jira] [Updated] (SPARK-38288) Aggregate push down doesnt work using Spark SQL jdbc datasource with postgresql

     [ https://issues.apache.org/jira/browse/SPARK-38288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Luis Lozano Coira updated SPARK-38288:
--------------------------------------
    Description: 
I am establishing a connection with postgresql using the Spark SQL jdbc datasource. I have started the spark shell including the postgres driver and I can connect and execute queries without problems. I am using this statement:
{code:java}
val df = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/").option("driver", "org.postgresql.Driver").option("dbtable", "test").option("user", "postgres").option("password", "*******").option("pushDownAggregate",true).load()
{code}
I am adding the pushDownAggregate option because I would like the aggregations are delegated to the source. But for some reason this is not happening.

Reviewing this pull request, it seems that this feature should be merged into 3.2. [https://github.com/apache/spark/pull/29695]

I am making the aggregations considering the mentioned limitations. An example case where I don't see pushdown being done would be this one:
{code:java}
df.groupBy("name").max("age").show()
{code}

The results of the queryExecution are shown below:

{code:java}
scala> df.groupBy("name").max("age").queryExecution.executedPlan
res19: org.apache.spark.sql.execution.SparkPlan =
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, max(age)#544])
   +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
      +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], output=[name#274, max#548])
         +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<age:int,name:string>

scala> dfp.groupBy("name").max("age").queryExecution.toString
res20: String =
"== Parsed Logical Plan ==
Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
+- Relation [age#246] JDBCRelation(test) [numPartitions=1]

== Analyzed Logical Plan ==
name: string, max(age): int
Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
+- Relation [age#24...
{code}

What could be the problem? Should pushDownAggregate work in this case?

  was:
I am establishing a connection with postgresql using the Spark SQL jdbc datasource. I have started the spark shell including the postgres driver and I can connect and execute queries without problems. I am using this statement:
{code:java}
val df = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/").option("driver", "org.postgresql.Driver").option("dbtable", "test").option("user", "postgres").option("password", "*******").option("pushDownAggregate",true).load()
{code}
I am adding the pushDownAggregate option because I would like the aggregations are delegated to the source. But for some reason this is not happening.

Reviewing this pull request, it seems that this feature should be merged into 3.2. [https://github.com/apache/spark/pull/29695]

I am making the aggregations considering the mentioned limitations. An example case where I don't see pushdown being done would be this one:
{code:java}
df.groupBy("name").max("age").show()
{code}
What could be the problem? Should pushDownAggregate work in this case?


> Aggregate push down doesnt work using Spark SQL jdbc datasource with postgresql
> -------------------------------------------------------------------------------
>
>                 Key: SPARK-38288
>                 URL: https://issues.apache.org/jira/browse/SPARK-38288
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: Luis Lozano Coira
>            Priority: Major
>              Labels: DataSource, Spark-SQL
>
> I am establishing a connection with postgresql using the Spark SQL jdbc datasource. I have started the spark shell including the postgres driver and I can connect and execute queries without problems. I am using this statement:
> {code:java}
> val df = spark.read.format("jdbc").option("url", "jdbc:postgresql://host:port/").option("driver", "org.postgresql.Driver").option("dbtable", "test").option("user", "postgres").option("password", "*******").option("pushDownAggregate",true).load()
> {code}
> I am adding the pushDownAggregate option because I would like the aggregations are delegated to the source. But for some reason this is not happening.
> Reviewing this pull request, it seems that this feature should be merged into 3.2. [https://github.com/apache/spark/pull/29695]
> I am making the aggregations considering the mentioned limitations. An example case where I don't see pushdown being done would be this one:
> {code:java}
> df.groupBy("name").max("age").show()
> {code}
> The results of the queryExecution are shown below:
> {code:java}
> scala> df.groupBy("name").max("age").queryExecution.executedPlan
> res19: org.apache.spark.sql.execution.SparkPlan =
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[name#274], functions=[max(age#246)], output=[name#274, max(age)#544])
>    +- Exchange hashpartitioning(name#274, 200), ENSURE_REQUIREMENTS, [id=#205]
>       +- HashAggregate(keys=[name#274], functions=[partial_max(age#246)], output=[name#274, max#548])
>          +- Scan JDBCRelation(test) [numPartitions=1] [age#246,name#274] PushedAggregates: [], PushedFilters: [], PushedGroupby: [], ReadSchema: struct<age:int,name:string>
> scala> dfp.groupBy("name").max("age").queryExecution.toString
> res20: String =
> "== Parsed Logical Plan ==
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#246] JDBCRelation(test) [numPartitions=1]
> == Analyzed Logical Plan ==
> name: string, max(age): int
> Aggregate [name#274], [name#274, max(age#246) AS max(age)#581]
> +- Relation [age#24...
> {code}
> What could be the problem? Should pushDownAggregate work in this case?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org