You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hugh Hyndman <hu...@redleaf.ca> on 2018/09/09 10:56:14 UTC

PushDown Filter Not Reset

Hi,

I am developing my own datasource reader and have implemented pushdown filters. I am struggling with a case where I do not get a call to pushFilters() when I have no filter in order to reset a previously defined filter. Here is a a Spark shell session to demonstrate the issue, with debug log statements to show call invocation.

1) Initial unfiltered load/show. Note, data source provides schema.

scala> val df = spark.read.format("MyDataSource").
    option("function", "testSpartan").
    option("loglevel", "debug").load
18/08/26 07:42:56.580 DEBUG dr: MyDataSourceReader()
18/08/26 07:42:56.580 DEBUG dr:   function: testSpartan
18/08/26 07:42:56.580 DEBUG dr:   loglevel: debug
df: org.apache.spark.sql.DataFrame = [jcolumn: bigint, ccolumn: string]

scala> df.show
18/08/26 07:43:33.659 DEBUG dr: pruneColums()
18/08/26 07:43:33.659 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:43:33.659 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.678 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:43:33.699 DEBUG dr: next()
18/08/26 07:43:33.701 DEBUG dr: get()
18/08/26 07:43:33.701 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
|      2|      c|
|      3|      a|
|      4|      b|
|      5|      c|
|      6|      a|
|      7|      b|
|      8|      c|
|      9|      a|
+-------+-------+
2) Call with a simple filter. Note the call to pushFilters()

scala> df.filter("jcolumn<2").show
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushFilters()
18/08/26 07:45:42.501 DEBUG dr:   LessThan(jcolumn,2)
18/08/26 07:45:42.501 DEBUG dr: pruneColums()
18/08/26 07:45:42.501 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:45:42.501 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.512 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:45:42.529 DEBUG dr: next()
18/08/26 07:45:42.532 DEBUG dr: get()
18/08/26 07:45:42.532 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
+-------+-------+
3) Subsequent call with no filter. You'll see that I don't get a call to pushFilters() with an empty Filter array. I am unsure what "signal" I should get in order to reset supported filters

scala> df.show
18/08/26 07:46:21.442 DEBUG dr: pruneColums()
18/08/26 07:46:21.442 DEBUG dr:   StructField(jcolumn,LongType,false)
18/08/26 07:46:21.442 DEBUG dr:   StructField(ccolumn,StringType,false)
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.452 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:46:21.468 DEBUG dr: next()
18/08/26 07:46:21.470 DEBUG dr: get()
18/08/26 07:46:21.471 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
|      0|      a|
|      1|      b|
+-------+-------+
Thanks

/Hugh