You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Nikolas Vanderhoof <ni...@gmail.com> on 2019/04/04 02:13:20 UTC

Resolving generated expressions in catalyst

Hey everyone!

I'm trying to implement a custom catalyst optimization that I think may be
useful to others that make frequent use of the arrays_overlap and
array_contains functions in joins.



Consider this first query joining on overlapping arrays.
```
import org.apache.spark.sql.functions._

val left = Seq((Seq(1, 2, 3, 4), "hi")).toDF("arr", "word")
val right = Seq((Seq(2, 5), "bye")).toDF("arr", "word")

// This results in a cartesian product in the physical plan if the tables
are sufficiently large
val naiveJoin = left.join(right, arrays_overlap(left("arr"), right("arr")))
```

We can transform it into one like this that

// This will result in a non-cartesian product join
val fastJoin = {
  left.withColumn("explode_larr", explode(left("arr"))).as("__lp").join(
    right.withColumn("explode_rarr", explode(col("arr"))).as("__rp"),
    col("explode_larr") === col("explode_rarr")
  ).drop("explode_larr", "explode_rarr").distinct
}



I've implemented a first attempt of this optimization on my fork:
but I'm having difficulty figuring out how to resolve my attributes
on the exploded column.
https://github.com/nvander1/spark/commit/711184f98774b7ac46fcfdf4e28e2d71041d89e1

Examining the logical tree of fastJoin:
00 Deduplicate [arr#617, arr#626, word#618, word#627] 01 +- Project
[arr#617, word#618, arr#626, word#627] 02 +- Join Inner, (explode_larr#643
= explode_rarr#648) 03 :- SubqueryAlias `__lp` 04 : +- Project [arr#617,
word#618, explode_larr#643] 05 : +- Generate explode(arr#617), false,
[explode_larr#643] 06 : +- Project [_1#614 AS arr#617, _2#615 AS word#618]
07 : +- LocalRelation [_1#614, _2#615] 08 +- SubqueryAlias `__rp` 09 +-
Project [arr#626, word#627, explode_rarr#648] 10 +- Generate
explode(arr#626), false, [explode_rarr#648] 11 +- Project [_1#623 AS
arr#626, _2#624 AS word#627] 12 +- LocalRelation [_1#623, _2#624]



This is the logical tree of my implementation thus far:
'Deduplicate +- 'Project [arr#2143, word#2144, arr#2152, word#2153] +-
'Join Inner, ('explode_larr = 'explode_rarr) :- 'SubqueryAlias `__lp` : +-
'Project [arr#2143, word#2144, 'explode_larr] : +- 'Generate
explode(arr#2143), false, explode_larr : +- LocalRelation [arr#2143,
word#2144] +- 'SubqueryAlias `__rp` +- 'Project [arr#2152, word#2153,
'explode_rarr] +- 'Generate explode(arr#2152), false, explode_rarr +-
LocalRelation [arr#2152, word#2153]



Related information (similar cases):
https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27359?filter=addedrecently