You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2016/07/13 20:29:20 UTC
[jira] [Commented] (SPARK-16418) DataFrame.filter fails if it
references a window function
[ https://issues.apache.org/jira/browse/SPARK-16418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15375680#comment-15375680 ]
Dongjoon Hyun commented on SPARK-16418:
---------------------------------------
Hi, [~erikwright].
I think the following is what you trying. If I missed something, please comment me. (The following is the result on current master, 2.1.0.)
{code}
>>> from pyspark.sql import types
>>> from pyspark.sql import Window
>>> from pyspark.sql import functions
>>>
>>> schema = types.StructType([types.StructField('id', types.IntegerType(), False),types.StructField('state', types.StringType(), True),types.StructField('seq', types.IntegerType(), False)])
>>> df = spark.createDataFrame([(1, 'hello', 1),(1, 'world', 2),(1,'world',3)], schema)
>>> df2 = df.withColumn("c", functions.lag('state').over(Window.partitionBy('id').orderBy('seq').rowsBetween(-1,-1)))
>>> df2.filter(df2["c"] != 'world').show()
+---+-----+---+-----+
| id|state|seq| c|
+---+-----+---+-----+
| 1|world| 2|hello|
+---+-----+---+-----+
{code}
> DataFrame.filter fails if it references a window function
> ---------------------------------------------------------
>
> Key: SPARK-16418
> URL: https://issues.apache.org/jira/browse/SPARK-16418
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Erik Wright
>
> I'm using Data Frames in Python. If I build up a column expression that includes a window function, then filter on it, the resulting Data Frame cannot be evaluated.
> If I first add that column expression to the Data Frame as a column (or add the sub-expression that includes the window function as a column), the filter works. This works even if I later drop the added column.
> It seems like this shouldn't be required. In the worst case, the platform should be able to do this for me under the hood when/if necessary.
> {code:none}
> In [1]: from pyspark.sql import types
> In [2]: from pyspark.sql import Window
> In [3]: from pyspark.sql import functions
> In [4]: schema = types.StructType([types.StructField('id', types.IntegerType(), False),
> ...: types.StructField('state', types.StringType(), True),
> ...: types.StructField('seq', types.IntegerType(), False)])
> In [5]: original_data_frame = sc.sql.createDataFrame([(1, 'hello', 1),(1, 'world', 2),(1,'world',3)], schema)
> In [6]: previous_state = functions.lag('state').over(Window.partitionBy('id').orderBy('seq').rowsBetween(-1,-1))
> In [7]: filter_condition = (original_data_frame['state'] == 'world') & (previous_state != 'world')
> In [8]: data_frame = original_data_frame.withColumn('filter_condition', filter_condition)
> In [9]: data_frame.show()
> +---+-----+---+----------------+
> | id|state|seq|filter_condition|
> +---+-----+---+----------------+
> | 1|hello| 1| false|
> | 1|world| 2| true|
> | 1|world| 3| false|
> +---+-----+---+----------------+
> In [10]: data_frame = data_frame.filter(data_frame['filter_condition']).drop('filter_condition')
> In [11]: data_frame.explain()
> == Physical Plan ==
> WholeStageCodegen
> : +- Project [id#0,state#1,seq#2]
> : +- Filter (((isnotnull(state#1) && isnotnull(_we0#6)) && (state#1 = world)) && NOT (_we0#6 = world))
> : +- INPUT
> +- Window [lag(state#1, 1, null) windowspecdefinition(id#0, seq#2 ASC, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS _we0#6], [id#0], [seq#2 ASC]
> +- WholeStageCodegen
> : +- Sort [id#0 ASC,seq#2 ASC], false, 0
> : +- INPUT
> +- Exchange hashpartitioning(id#0, 200), None
> +- Scan ExistingRDD[id#0,state#1,seq#2]
> In [12]: data_frame.show()
> +---+-----+---+
> | id|state|seq|
> +---+-----+---+
> | 1|world| 2|
> +---+-----+---+
> In [13]: data_frame = original_data_frame.withColumn('previous_state', previous_state)
> In [14]: data_frame.show()
> +---+-----+---+--------------+
> | id|state|seq|previous_state|
> +---+-----+---+--------------+
> | 1|hello| 1| null|
> | 1|world| 2| hello|
> | 1|world| 3| world|
> +---+-----+---+--------------+
> In [15]: filter_condition = (data_frame['state'] == 'world') & (data_frame['previous_state'] != 'world')
> In [16]: data_frame = data_frame.filter(filter_condition).drop('previous_state')
> In [17]: data_frame.explain()
> == Physical Plan ==
> WholeStageCodegen
> : +- Project [id#0,state#1,seq#2]
> : +- Filter (((isnotnull(state#1) && isnotnull(previous_state#12)) && (state#1 = world)) && NOT (previous_state#12 = world))
> : +- INPUT
> +- Window [lag(state#1, 1, null) windowspecdefinition(id#0, seq#2 ASC, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) AS previous_state#12], [id#0], [seq#2 ASC]
> +- WholeStageCodegen
> : +- Sort [id#0 ASC,seq#2 ASC], false, 0
> : +- INPUT
> +- Exchange hashpartitioning(id#0, 200), None
> +- Scan ExistingRDD[id#0,state#1,seq#2]
> In [18]: data_frame.show()
> +---+-----+---+
> | id|state|seq|
> +---+-----+---+
> | 1|world| 2|
> +---+-----+---+
> In [19]: filter_condition = (original_data_frame['state'] == 'world') & (previous_state != 'world')
> In [20]: data_frame = original_data_frame.filter(filter_condition)
> In [21]: data_frame.explain()
> == Physical Plan ==
> WholeStageCodegen
> : +- Filter ((isnotnull(state#1) && (state#1 = world)) && NOT (lag(state#1, 1, null) windowspecdefinition(id#0, seq#2 ASC, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) = world))
> : +- INPUT
> +- Scan ExistingRDD[id#0,state#1,seq#2]
> In [22]: data_frame.show()
> ---------------------------------------------------------------------------
> Py4JJavaError Traceback (most recent call last)
> /Users/erikwright/src/starscream/bin/starscream in <module>()
> ----> 1 data_frame.show()
> /Users/erikwright/spark-2.0.0-preview-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in show(self, n, truncate)
> 271 +---+-----+
> 272 """
> --> 273 print(self._jdf.showString(n, truncate))
> 274
> 275 def __repr__(self):
> /Users/erikwright/spark-2.0.0-preview-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
> 931 answer = self.gateway_client.send_command(command)
> 932 return_value = get_return_value(
> --> 933 answer, self.gateway_client, self.target_id, self.name)
> 934
> 935 for temp_arg in temp_args:
> /Users/erikwright/spark-2.0.0-preview-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
> 55 def deco(*a, **kw):
> 56 try:
> ---> 57 return f(*a, **kw)
> 58 except py4j.protocol.Py4JJavaError as e:
> 59 s = e.java_exception.toString()
> /Users/erikwright/spark-2.0.0-preview-bin-hadoop2.7/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
> 310 raise Py4JJavaError(
> 311 "An error occurred while calling {0}{1}{2}.\n".
> --> 312 format(target_id, ".", name), value)
> 313 else:
> 314 raise Py4JError(
> Py4JJavaError: An error occurred while calling o128.showString.
> : java.lang.UnsupportedOperationException: Cannot evaluate expression: lag(input[1, string], 1, null) windowspecdefinition(input[0, int], input[2, int] ASC, ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING)
> at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:220)
> at org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:288)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:102)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:102)
> at org.apache.spark.sql.catalyst.expressions.BinaryExpression.nullSafeCodeGen(Expression.scala:452)
> at org.apache.spark.sql.catalyst.expressions.BinaryExpression.defineCodeGen(Expression.scala:435)
> at org.apache.spark.sql.catalyst.expressions.EqualTo.doGenCode(predicates.scala:429)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:102)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:102)
> at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullSafeCodeGen(Expression.scala:363)
> at org.apache.spark.sql.catalyst.expressions.UnaryExpression.defineCodeGen(Expression.scala:347)
> at org.apache.spark.sql.catalyst.expressions.Not.doGenCode(predicates.scala:103)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
> at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:102)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:102)
> at org.apache.spark.sql.execution.FilterExec.org$apache$spark$sql$execution$FilterExec$$genPredicate$1(basicPhysicalOperators.scala:127)
> at org.apache.spark.sql.execution.FilterExec$$anonfun$12.apply(basicPhysicalOperators.scala:169)
> at org.apache.spark.sql.execution.FilterExec$$anonfun$12.apply(basicPhysicalOperators.scala:153)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:153)
> at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:153)
> at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:218)
> at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:244)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:218)
> at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:113)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:78)
> at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:79)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:304)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:343)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
> at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
> at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
> at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
> at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2122)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2436)
> at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2121)
> at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2128)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1862)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1861)
> at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2466)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:1861)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2078)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:240)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:211)
> at java.lang.Thread.run(Thread.java:745)
> In [23]:
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org