You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Alexi Kostibas <ak...@nuna.com.INVALID> on 2017/02/09 00:56:58 UTC

Strange behavior with 'not' and filter pushdown

Hi,

I have an application where I’m filtering data with SparkSQL with simple WHERE clauses. I also want the ability to show the unmatched rows for any filter, and so am wrapping the previous clause in `NOT()` to get the inverse. Example:

Filter:  username is not null
Inverse filter:  NOT(username is not null)

This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse filter always returns zero results. It looks like this is a problem with how the filter is getting pushed down to Parquet. Specifically, the pushdown includes both the “is not null” filter, AND “not(is not null)”, which would obviously result in zero matches. An example below:

pyspark:
> x = spark.sql('select my_id from my_table where username is not null')
> y = spark.sql('select my_id from my_table where not(username is not null)')                                                                                                                            
> x.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter isnotnull(username#91)
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [], PushedFilters: [IsNotNull(username)],
       ReadSchema: struct<my_id:bigint,username:string>
[1159]> y.explain()
== Physical Plan ==
*Project [my_id#6L]
+- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
   +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
       Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
       PartitionFilters: [],
       PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],username
       ReadSchema: struct<my_id:bigint,username:string>

Presently I’m working around this by using the new functionality of NOT EXISTS in Spark 2, but that seems like overkill.

Any help appreciated.

Alexi Kostibas
Engineering
Nuna
650 Townsend Street, Suite 425
San Francisco, CA 94103


Re: Strange behavior with 'not' and filter pushdown

Posted by Takeshi Yamamuro <li...@gmail.com>.
Oh, Thanks for checking!

On Tue, Feb 14, 2017 at 12:32 PM, Xiao Li <ga...@gmail.com> wrote:

> https://github.com/apache/spark/pull/16894
>
> Already backported to Spark 2.0
>
> Thanks!
>
> Xiao
>
> 2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <li...@gmail.com>:
>
>> cc: xiao
>>
>> IIUC a xiao's commit below fixed this issue in master.
>> https://github.com/apache/spark/commit/2eb093decb5e87a1ea71b
>> baa28092876a8c84996
>>
>> Is this fix worth backporting to the v2.0 branch?
>> I checked I could reproduce there:
>>
>> ---
>>
>> scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0",
>> "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
>> scala> spark.read.parquet("/Users/maropu/Desktop/data").createOrRep
>> laceTempView("t")
>> scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
>> scala> df.explain(true)
>> == Parsed Logical Plan ==
>> 'Project ['c0]
>> +- 'Filter NOT isnotnull('c1)
>>    +- 'UnresolvedRelation `t`
>>
>> == Analyzed Logical Plan ==
>> c0: int
>> Project [c0#16]
>> +- Filter NOT isnotnull(c1#17)
>>    +- SubqueryAlias t
>>       +- Relation[c0#16,c1#17] parquet
>>
>> == Optimized Logical Plan ==
>> Project [c0#16]
>> +- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
>>            ^^^^^^^^^^^^^^^^
>>    +- Relation[c0#16,c1#17] parquet
>>
>> == Physical Plan ==
>> *Project [c0#16]
>> +- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
>>    +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat,
>> InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [],
>> PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema:
>> struct<c0:int,c1:string>
>>
>> scala> df.show
>> +---+
>> | c0|
>> +---+
>> +---+
>>
>>
>>
>>
>> // maropu
>>
>>
>> On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <
>> everett@nuna.com.invalid> wrote:
>>
>>> On the plus side, looks like this may be fixed in 2.1.0:
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>       +- *Project
>>>          +- *Filter NOT isnotnull(username#14)
>>>             +- *FileScan parquet [username#14] Batched: true, Format:
>>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>>> ReadSchema: struct<username:string>
>>>
>>>
>>>
>>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com>
>>> wrote:
>>>
>>>> Bumping this thread.
>>>>
>>>> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
>>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>>
>>>> Spark 1.6.2:
>>>>
>>>> explain select count(*) from parquet_table where not( username is not
>>>> null)
>>>>
>>>> == Physical Plan ==
>>>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>>>> output=[_c0#1822L])
>>>> +- TungstenExchange SinglePartition, None
>>>>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
>>>> output=[count#1825L])
>>>>  +- Project
>>>>  +- Filter NOT isnotnull(username#1590)
>>>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>>>> PushedFilters: [Not(IsNotNull(username))]
>>>>
>>>> Spark 2.0.2
>>>>
>>>> explain select count(*) from parquet_table where not( username is not
>>>> null)
>>>>
>>>> == Physical Plan ==
>>>> *HashAggregate(keys=[], functions=[count(1)])
>>>> +- Exchange SinglePartition
>>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>>  +- *Project
>>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>>>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>>> ReadSchema: struct<username:string>
>>>>
>>>> Example to generate the above:
>>>>
>>>> // Create some fake data
>>>>
>>>> import org.apache.spark.sql.Row
>>>> import org.apache.spark.sql.Dataset
>>>> import org.apache.spark.sql.types._
>>>>
>>>> val rowsRDD = sc.parallelize(Seq(
>>>>     Row(1, "fred"),
>>>>     Row(2, "amy"),
>>>>     Row(3, null)))
>>>>
>>>> val schema = StructType(Seq(
>>>>     StructField("id", IntegerType, nullable = true),
>>>>     StructField("username", StringType, nullable = true)))
>>>>
>>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>>
>>>> val path = "SOME PATH HERE"
>>>>
>>>> data.write.mode("overwrite").parquet(path)
>>>>
>>>> val testData = sqlContext.read.parquet(path)
>>>>
>>>> testData.registerTempTable("filter_test_table")
>>>>
>>>>
>>>> %sql
>>>> explain select count(*) from filter_test_table where not( username is
>>>> not null)
>>>>
>>>>
>>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>>> akostibas@nuna.com.invalid> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I have an application where I’m filtering data with SparkSQL with
>>>>> simple WHERE clauses. I also want the ability to show the unmatched rows
>>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get
>>>>> the inverse. Example:
>>>>>
>>>>> Filter:  username is not null
>>>>> Inverse filter:  NOT(username is not null)
>>>>>
>>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>>>> inverse filter always returns zero results. It looks like this is a problem
>>>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>>>> which would obviously result in zero matches. An example below:
>>>>>
>>>>> pyspark:
>>>>> > x = spark.sql('select my_id from my_table where *username is not
>>>>> null*')
>>>>> > y = spark.sql('select my_id from my_table where not(*username is
>>>>> not null*)')
>>>>>
>>>>> > x.explain()
>>>>> == Physical Plan ==
>>>>> *Project [my_id#6L]
>>>>> +- *Filter isnotnull(username#91)
>>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>>> [1159]> y.explain()
>>>>> == Physical Plan ==
>>>>> *Project [my_id#6L]
>>>>> +- *Filter (isnotnull(username#91) && NOT
>>>>> isnotnull(username#91))username
>>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>>        PartitionFilters: [],
>>>>>        PushedFilters: [IsNotNull(username),
>>>>> Not(IsNotNull(username))],username
>>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>>>
>>>>> Presently I’m working around this by using the new functionality of
>>>>> NOT EXISTS in Spark 2, but that seems like overkill.
>>>>>
>>>>> Any help appreciated.
>>>>>
>>>>>
>>>>> *Alexi Kostibas*Engineering
>>>>> *Nuna*
>>>>> 650 Townsend Street, Suite 425
>>>>> San Francisco, CA 94103
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro

Re: Strange behavior with 'not' and filter pushdown

Posted by Xiao Li <ga...@gmail.com>.
https://github.com/apache/spark/pull/16894

Already backported to Spark 2.0

Thanks!

Xiao

2017-02-13 17:41 GMT-08:00 Takeshi Yamamuro <li...@gmail.com>:

> cc: xiao
>
> IIUC a xiao's commit below fixed this issue in master.
> https://github.com/apache/spark/commit/2eb093decb5e87a1ea71bbaa280928
> 76a8c84996
>
> Is this fix worth backporting to the v2.0 branch?
> I checked I could reproduce there:
>
> ---
>
> scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0",
> "c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
> scala> spark.read.parquet("/Users/maropu/Desktop/data").
> createOrReplaceTempView("t")
> scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
> scala> df.explain(true)
> == Parsed Logical Plan ==
> 'Project ['c0]
> +- 'Filter NOT isnotnull('c1)
>    +- 'UnresolvedRelation `t`
>
> == Analyzed Logical Plan ==
> c0: int
> Project [c0#16]
> +- Filter NOT isnotnull(c1#17)
>    +- SubqueryAlias t
>       +- Relation[c0#16,c1#17] parquet
>
> == Optimized Logical Plan ==
> Project [c0#16]
> +- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
>            ^^^^^^^^^^^^^^^^
>    +- Relation[c0#16,c1#17] parquet
>
> == Physical Plan ==
> *Project [c0#16]
> +- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
>    +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat,
> InputPaths: file:/Users/maropu/Desktop/data, PartitionFilters: [],
> PushedFilters: [IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema:
> struct<c0:int,c1:string>
>
> scala> df.show
> +---+
> | c0|
> +---+
> +---+
>
>
>
>
> // maropu
>
>
> On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <
> everett@nuna.com.invalid> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>       +- *Project
>>          +- *Filter NOT isnotnull(username#14)
>>             +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct<username:string>
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com>
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct<username:string>
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>>     Row(1, "fred"),
>>>     Row(2, "amy"),
>>>     Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>>     StructField("id", IntegerType, nullable = true),
>>>     StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akostibas@nuna.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an application where I’m filtering data with SparkSQL with
>>>> simple WHERE clauses. I also want the ability to show the unmatched rows
>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get
>>>> the inverse. Example:
>>>>
>>>> Filter:  username is not null
>>>> Inverse filter:  NOT(username is not null)
>>>>
>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>>> inverse filter always returns zero results. It looks like this is a problem
>>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>>> which would obviously result in zero matches. An example below:
>>>>
>>>> pyspark:
>>>> > x = spark.sql('select my_id from my_table where *username is not
>>>> null*')
>>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>>> null*)')
>>>>
>>>> > x.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter isnotnull(username#91)
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>> [1159]> y.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>>> e
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [],
>>>>        PushedFilters: [IsNotNull(username),
>>>> Not(IsNotNull(username))],username
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>>
>>>> Presently I’m working around this by using the new functionality of NOT
>>>> EXISTS in Spark 2, but that seems like overkill.
>>>>
>>>> Any help appreciated.
>>>>
>>>>
>>>> *Alexi Kostibas*Engineering
>>>> *Nuna*
>>>> 650 Townsend Street, Suite 425
>>>> San Francisco, CA 94103
>>>>
>>>>
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Re: Strange behavior with 'not' and filter pushdown

Posted by Takeshi Yamamuro <li...@gmail.com>.
cc: xiao

IIUC a xiao's commit below fixed this issue in master.
https://github.com/apache/spark/commit/2eb093decb5e87a1ea71bbaa28092876a8c84996

Is this fix worth backporting to the v2.0 branch?
I checked I could reproduce there:

---

scala> Seq((1, "a"), (2, "b"), (3, null)).toDF("c0",
"c1").write.mode("overwrite").parquet("/Users/maropu/Desktop/data")
scala>
spark.read.parquet("/Users/maropu/Desktop/data").createOrReplaceTempView("t")
scala> val df = sql("SELECT c0 FROM t WHERE NOT(c1 IS NOT NULL)")
scala> df.explain(true)
== Parsed Logical Plan ==
'Project ['c0]
+- 'Filter NOT isnotnull('c1)
   +- 'UnresolvedRelation `t`

== Analyzed Logical Plan ==
c0: int
Project [c0#16]
+- Filter NOT isnotnull(c1#17)
   +- SubqueryAlias t
      +- Relation[c0#16,c1#17] parquet

== Optimized Logical Plan ==
Project [c0#16]
+- Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
           ^^^^^^^^^^^^^^^^
   +- Relation[c0#16,c1#17] parquet

== Physical Plan ==
*Project [c0#16]
+- *Filter (isnotnull(c1#17) && NOT isnotnull(c1#17))
   +- *BatchedScan parquet [c0#16,c1#17] Format: ParquetFormat, InputPaths:
file:/Users/maropu/Desktop/data, PartitionFilters: [], PushedFilters:
[IsNotNull(c1), Not(IsNotNull(c1))], ReadSchema: struct<c0:int,c1:string>

scala> df.show
+---+
| c0|
+---+
+---+




// maropu


On Sun, Feb 12, 2017 at 10:01 AM, Everett Anderson <everett@nuna.com.invalid
> wrote:

> On the plus side, looks like this may be fixed in 2.1.0:
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>       +- *Project
>          +- *Filter NOT isnotnull(username#14)
>             +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
> ReadSchema: struct<username:string>
>
>
>
> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Bumping this thread.
>>
>> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
>> Not(IsNotNull(username))] seems like a rather severe bug.
>>
>> Spark 1.6.2:
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#1822L])
>> +- TungstenExchange SinglePartition, None
>>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#1825L])
>>  +- Project
>>  +- Filter NOT isnotnull(username#1590)
>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>> PushedFilters: [Not(IsNotNull(username))]
>>
>> Spark 2.0.2
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>  +- *Project
>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>> ReadSchema: struct<username:string>
>>
>> Example to generate the above:
>>
>> // Create some fake data
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>>     Row(1, "fred"),
>>     Row(2, "amy"),
>>     Row(3, null)))
>>
>> val schema = StructType(Seq(
>>     StructField("id", IntegerType, nullable = true),
>>     StructField("username", StringType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>> val path = "SOME PATH HERE"
>>
>> data.write.mode("overwrite").parquet(path)
>>
>> val testData = sqlContext.read.parquet(path)
>>
>> testData.registerTempTable("filter_test_table")
>>
>>
>> %sql
>> explain select count(*) from filter_test_table where not( username is not
>> null)
>>
>>
>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>> akostibas@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> I have an application where I’m filtering data with SparkSQL with simple
>>> WHERE clauses. I also want the ability to show the unmatched rows for any
>>> filter, and so am wrapping the previous clause in `NOT()` to get the
>>> inverse. Example:
>>>
>>> Filter:  username is not null
>>> Inverse filter:  NOT(username is not null)
>>>
>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>> inverse filter always returns zero results. It looks like this is a problem
>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>> which would obviously result in zero matches. An example below:
>>>
>>> pyspark:
>>> > x = spark.sql('select my_id from my_table where *username is not null*
>>> ')
>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>> null*)')
>>>
>>> > x.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter isnotnull(username#91)
>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>        ReadSchema: struct<my_id:bigint,username:string>
>>> [1159]> y.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>> e
>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>        PartitionFilters: [],
>>>        PushedFilters: [IsNotNull(username),
>>> Not(IsNotNull(username))],username
>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>
>>> Presently I’m working around this by using the new functionality of NOT
>>> EXISTS in Spark 2, but that seems like overkill.
>>>
>>> Any help appreciated.
>>>
>>>
>>> *Alexi Kostibas*Engineering
>>> *Nuna*
>>> 650 Townsend Street, Suite 425
>>> San Francisco, CA 94103
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro

Re: Strange behavior with 'not' and filter pushdown

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x
branch, as well.

On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson <ev...@nuna.com> wrote:

> Went ahead and opened
>
> https://issues.apache.org/jira/browse/SPARK-19586
>
> though I'd generally expect to just close it as fixed in 2.1.0 and roll on.
>
> On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>       +- *Project
>>          +- *Filter NOT isnotnull(username#14)
>>             +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct<username:string>
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com>
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct<username:string>
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>>     Row(1, "fred"),
>>>     Row(2, "amy"),
>>>     Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>>     StructField("id", IntegerType, nullable = true),
>>>     StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akostibas@nuna.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an application where I’m filtering data with SparkSQL with
>>>> simple WHERE clauses. I also want the ability to show the unmatched rows
>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get
>>>> the inverse. Example:
>>>>
>>>> Filter:  username is not null
>>>> Inverse filter:  NOT(username is not null)
>>>>
>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>>> inverse filter always returns zero results. It looks like this is a problem
>>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>>> which would obviously result in zero matches. An example below:
>>>>
>>>> pyspark:
>>>> > x = spark.sql('select my_id from my_table where *username is not
>>>> null*')
>>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>>> null*)')
>>>>
>>>> > x.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter isnotnull(username#91)
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>> [1159]> y.explain()
>>>> == Physical Plan ==
>>>> *Project [my_id#6L]
>>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>>> e
>>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>>        PartitionFilters: [],
>>>>        PushedFilters: [IsNotNull(username),
>>>> Not(IsNotNull(username))],username
>>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>>
>>>> Presently I’m working around this by using the new functionality of NOT
>>>> EXISTS in Spark 2, but that seems like overkill.
>>>>
>>>> Any help appreciated.
>>>>
>>>>
>>>> *Alexi Kostibas*Engineering
>>>> *Nuna*
>>>> 650 Townsend Street, Suite 425
>>>> San Francisco, CA 94103
>>>>
>>>>
>>>
>>
>

Re: Strange behavior with 'not' and filter pushdown

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Went ahead and opened

https://issues.apache.org/jira/browse/SPARK-19586

though I'd generally expect to just close it as fixed in 2.1.0 and roll on.

On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ev...@nuna.com> wrote:

> On the plus side, looks like this may be fixed in 2.1.0:
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>    +- *HashAggregate(keys=[], functions=[partial_count(1)])
>       +- *Project
>          +- *Filter NOT isnotnull(username#14)
>             +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
> ReadSchema: struct<username:string>
>
>
>
> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com>
> wrote:
>
>> Bumping this thread.
>>
>> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
>> Not(IsNotNull(username))] seems like a rather severe bug.
>>
>> Spark 1.6.2:
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#1822L])
>> +- TungstenExchange SinglePartition, None
>>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#1825L])
>>  +- Project
>>  +- Filter NOT isnotnull(username#1590)
>>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
>> PushedFilters: [Not(IsNotNull(username))]
>>
>> Spark 2.0.2
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>  +- *Project
>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
>> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>> ReadSchema: struct<username:string>
>>
>> Example to generate the above:
>>
>> // Create some fake data
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>>     Row(1, "fred"),
>>     Row(2, "amy"),
>>     Row(3, null)))
>>
>> val schema = StructType(Seq(
>>     StructField("id", IntegerType, nullable = true),
>>     StructField("username", StringType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>> val path = "SOME PATH HERE"
>>
>> data.write.mode("overwrite").parquet(path)
>>
>> val testData = sqlContext.read.parquet(path)
>>
>> testData.registerTempTable("filter_test_table")
>>
>>
>> %sql
>> explain select count(*) from filter_test_table where not( username is not
>> null)
>>
>>
>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>> akostibas@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> I have an application where I’m filtering data with SparkSQL with simple
>>> WHERE clauses. I also want the ability to show the unmatched rows for any
>>> filter, and so am wrapping the previous clause in `NOT()` to get the
>>> inverse. Example:
>>>
>>> Filter:  username is not null
>>> Inverse filter:  NOT(username is not null)
>>>
>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>> inverse filter always returns zero results. It looks like this is a problem
>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>> which would obviously result in zero matches. An example below:
>>>
>>> pyspark:
>>> > x = spark.sql('select my_id from my_table where *username is not null*
>>> ')
>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>> null*)')
>>>
>>> > x.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter isnotnull(username#91)
>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>>        ReadSchema: struct<my_id:bigint,username:string>
>>> [1159]> y.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))usernam
>>> e
>>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>>        PartitionFilters: [],
>>>        PushedFilters: [IsNotNull(username),
>>> Not(IsNotNull(username))],username
>>>        ReadSchema: struct<my_id:bigint,username:string>
>>>
>>> Presently I’m working around this by using the new functionality of NOT
>>> EXISTS in Spark 2, but that seems like overkill.
>>>
>>> Any help appreciated.
>>>
>>>
>>> *Alexi Kostibas*Engineering
>>> *Nuna*
>>> 650 Townsend Street, Suite 425
>>> San Francisco, CA 94103
>>>
>>>
>>
>

Re: Strange behavior with 'not' and filter pushdown

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
      +- *Project
         +- *Filter NOT isnotnull(username#14)
            +- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct<username:string>



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ev...@nuna.com> wrote:

> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of  [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.<hive table name>[username#35] Format:
> ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct<username:string>
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
>     Row(1, "fred"),
>     Row(2, "amy"),
>     Row(3, null)))
>
> val schema = StructType(Seq(
>     StructField("id", IntegerType, nullable = true),
>     StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <akostibas@nuna.com.invalid
> > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter:  username is not null
>> Inverse filter:  NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>        ReadSchema: struct<my_id:bigint,username:string>
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>        PartitionFilters: [],
>>        PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>>        ReadSchema: struct<my_id:bigint,username:string>
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>

Re: Strange behavior with 'not' and filter pushdown

Posted by Everett Anderson <ev...@nuna.com.INVALID>.
Bumping this thread.

Translating "where not(username is not null)" into a filter of
[IsNotNull(username),
Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: <path to parquet>,
PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.<hive table name>[username#35] Format:
ParquetFormat, InputPaths: <path to parquet>, PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct<username:string>

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
    Row(1, "fred"),
    Row(2, "amy"),
    Row(3, null)))

val schema = StructType(Seq(
    StructField("id", IntegerType, nullable = true),
    StructField("username", StringType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not
null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <ak...@nuna.com.invalid>
wrote:

> Hi,
>
> I have an application where I’m filtering data with SparkSQL with simple
> WHERE clauses. I also want the ability to show the unmatched rows for any
> filter, and so am wrapping the previous clause in `NOT()` to get the
> inverse. Example:
>
> Filter:  username is not null
> Inverse filter:  NOT(username is not null)
>
> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse
> filter always returns zero results. It looks like this is a problem with
> how the filter is getting pushed down to Parquet. Specifically, the
> pushdown includes both the “is not null” filter, AND “not(is not null)”,
> which would obviously result in zero matches. An example below:
>
> pyspark:
> > x = spark.sql('select my_id from my_table where *username is not null*')
> > y = spark.sql('select my_id from my_table where not(*username is not
> null*)')
>
> > x.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter isnotnull(username#91)
>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>        PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>        ReadSchema: struct<my_id:bigint,username:string>
> [1159]> y.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>    +- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>        Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>        PartitionFilters: [],
>        PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> username
>        ReadSchema: struct<my_id:bigint,username:string>
>
> Presently I’m working around this by using the new functionality of NOT
> EXISTS in Spark 2, but that seems like overkill.
>
> Any help appreciated.
>
>
> *Alexi Kostibas*Engineering
> *Nuna*
> 650 Townsend Street, Suite 425
> San Francisco, CA 94103
>
>