You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Saurabh Chawla (Jira)" <ji...@apache.org> on 2021/07/23 08:35:00 UTC

[jira] [Commented] (SPARK-35933) PartitionFilters and pushFilters not applied to window functions

    [ https://issues.apache.org/jira/browse/SPARK-35933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17386086#comment-17386086 ] 

Saurabh Chawla commented on SPARK-35933:
----------------------------------------

This is not the bug,  its the actual functionality to push down filter in case of window

 

 
{code:java}
// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
{code}
 

Here in this case 

1 ) All the expressions are part of window partitioning key. The expressions can be compound.
"id" === 1 is not the part of Window.partitionBy("name").orderBy("ts")
check this PR for reference  [https://github.com/apache/spark/pull/11635]

Check this Unit test  https://github.com/apache/spark/blob/master/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1075 

 

 

> PartitionFilters and pushFilters not applied to window functions
> ----------------------------------------------------------------
>
>                 Key: SPARK-35933
>                 URL: https://issues.apache.org/jira/browse/SPARK-35933
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.8, 3.1.2
>            Reporter: Shreyas Kothavade
>            Priority: Major
>
> Spark does not apply partition and pushed filters when the partition by column and window function partition columns are not the same. For example, in the code below, the data frame is created with a partition on "id". And I use the partitioned data frame to calculate lag which is partitioned by "name". In this case, the query plan shows the partitionFilters and pushed Filters as empty.
> {code:java}
> spark
>   .createDataFrame(
>     Seq(
>       Person(
>         1,
>         "Andy",
>         new Timestamp(1499955986039L),
>         new Timestamp(1499955982342L)
>       ),
>       Person(
>         2,
>         "Jeff",
>         new Timestamp(1499955986339L),
>         new Timestamp(1499955986666L)
>       )
>     )
>   )
>  .write
>   .partitionBy("id")
>   .mode(SaveMode.Append)
>   .parquet("spark-warehouse/people")
> val dfPeople =
>   spark.read.parquet("spark-warehouse/people")
> dfPeople
>   .select(
>     $"id",
>     $"name",
>     lag(col("ts2"), 1).over(Window.partitionBy("name").orderBy("ts"))
>   )
>   .filter($"id" === 1)
>   .explain()
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org