You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "yahsuan, chang (JIRA)" <ji...@apache.org> on 2016/12/05 09:08:59 UTC

[jira] [Reopened] (SPARK-18712) keep the order of sql expression and support short circuit

     [ https://issues.apache.org/jira/browse/SPARK-18712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

yahsuan, chang reopened SPARK-18712:
------------------------------------

Thanks for your reply, but I still have some question.

In my code, all functions are deterministic. There is no randomness in there. 
Did I misunderstand anything?

Also, for the optimization purpose, I think allow user to hint the order of operation is helpful since user might have insight about their data.

For example, use a quick filter f1 to filter out most data, and then use a slow filter f2 to do more complicated filtering.

I try the following code

{code}
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)


def f(x):
    print('f', x)
    return True if x in (2, 3) else False


def g(x):
    print('g', x)
    return True if x % 2 == 0 else False

ff = F.udf(f, T.BooleanType())
gg = F.udf(g, T.BooleanType())

df = sqlc.range(5)
df = df.where(ff('id')).where(gg('id'))    
df.show()
{code}

and got two different output from spark 1.5.2 and spark 2.0.0

{code}
# spark 1.5.2, function g only is invoked for 2, and 3
('f', 0)
('f', 3)
('f', 4)
('g', 3)
('f', 1)
('f', 2)
('g', 2)
+---+
| id|
+---+
|  2|
+---+
{code}

{code}
# spark 2.0.2, function g is invoked for all data
('f', 0)
('g', 0)
('f', 2)
('g', 2)
('f', 1)
('g', 1)
('f', 3)
('g', 3)
('f', 4)
('g', 4)
+---+
| id|
+---+
|  2|
+---+
{code}

I am very curious about the reason or trade-off about this change of behavior.

> keep the order of sql expression and support short circuit
> ----------------------------------------------------------
>
>                 Key: SPARK-18712
>                 URL: https://issues.apache.org/jira/browse/SPARK-18712
>             Project: Spark
>          Issue Type: Wish
>          Components: SQL
>    Affects Versions: 2.0.2
>         Environment: Ubuntu 16.04
>            Reporter: yahsuan, chang
>
> The following python code fails with spark 2.0.2, but works with spark 1.5.2
> {code}
> # a.py
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> table = {5: True, 6: False}
> df = sqlc.range(10)
> df = df.where(df['id'].isin(5, 6))
> f = F.udf(lambda x: table[x], T.BooleanType())
> df = df.where(f(df['id']))
> # df.explain(True)
> print(df.count())
> {code}
> here is the exception 
> {code}
> KeyError: 0
> {code}
> I guess the problem is about the order of sql expression.
> the following are the explain of two spark version
> {code}
> # explain of spark 2.0.2
> == Parsed Logical Plan ==
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
>    +- Range (0, 10, step=1, splits=Some(4))
> == Analyzed Logical Plan ==
> id: bigint
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
>    +- Range (0, 10, step=1, splits=Some(4))
> == Optimized Logical Plan ==
> Filter (id#0L IN (5,6) && <lambda>(id#0L))
> +- Range (0, 10, step=1, splits=Some(4))
> == Physical Plan ==
> *Project [id#0L]
> +- *Filter (id#0L IN (5,6) && pythonUDF0#5)
>    +- BatchEvalPython [<lambda>(id#0L)], [id#0L, pythonUDF0#5]
>       +- *Range (0, 10, step=1, splits=Some(4))
> {code}
> {code}
> # explain of spark 1.5.2
> == Parsed Logical Plan ==
> 'Project [*,PythonUDF#<lambda>(id#0L) AS sad#1]
>  Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
>   LogicalRDD [id#0L], MapPartitionsRDD[3] at range at NativeMethodAccessorImpl.java:-2
> == Analyzed Logical Plan ==
> id: bigint, sad: int
> Project [id#0L,sad#1]
>  Project [id#0L,pythonUDF#2 AS sad#1]
>   EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
>    Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
>     LogicalRDD [id#0L], MapPartitionsRDD[3] at range at NativeMethodAccessorImpl.java:-2
> == Optimized Logical Plan ==
> Project [id#0L,pythonUDF#2 AS sad#1]
>  EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
>   Filter id#0L IN (5,6)
>    LogicalRDD [id#0L], MapPartitionsRDD[3] at range at NativeMethodAccessorImpl.java:-2
> == Physical Plan ==
> TungstenProject [id#0L,pythonUDF#2 AS sad#1]
>  !BatchPythonEvaluation PythonUDF#<lambda>(id#0L), [id#0L,pythonUDF#2]
>   Filter id#0L IN (5,6)
>    Scan PhysicalRDD[id#0L]
> Code Generation: true
> {code}
> Also, I am not sure if the sql expression support short circuit evaluation, so I do the following experiment
> {code}
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> def f(x):
>     print('in f')
>     return True
> f_udf = F.udf(f, T.BooleanType())
> df = sqlc.createDataFrame([(1, 2)], schema=['a', 'b'])
> df = df.where(f_udf('a') | f_udf('b'))
> df.show()
> {code}
> and I got the following output for both spark 1.5.2 and spark 2.0.2
> {code}
> in f
> in f
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  2|
> +---+---+
> {code}
> there is only one element in dataframe df, but the function f has been called twice, so I guess no short circuit.
> the result seems to conflict with #SPARK-1461



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org