You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Murphy (Jira)" <ji...@apache.org> on 2022/02/11 02:16:00 UTC

[jira] [Commented] (SPARK-37954) old columns should not be available after select or drop

    [ https://issues.apache.org/jira/browse/SPARK-37954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490630#comment-17490630 ] 

Andrew Murphy commented on SPARK-37954:
---------------------------------------

 Hi all,

Super excited to report on this one, this is my first time attempting to contribute to Spark so bear with me!

So it looks like df.drop("id") should not raise an error per the [documentation|https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.drop.html] "This is a no-op if schema doesn’t contain the given column name(s)." So this is intended behavior.

I was sure Filter was a bug though. Normally, filtering on a nonexistent column _does_ throw an AnalysisException, but in a very specific case, Catalyst actually propagates the reference to be removed.

*Code*

 
{code:java}
from pyspark.sql import SparkSession
from pyspark.sql.functions import col as col
spark = SparkSession.builder.appName('available_columns').getOrCreate()
df = spark.createDataFrame([{"oldcol": 1}, {"oldcol": 2}])
df = df.withColumnRenamed('oldcol', 'newcol')
df = df.filter(col("oldcol")!=2)
df.explain("extended") {code}
*Output*

 

 
{code:java}
+------+
|newcol|
+------+
|     1|
+------+

== Parsed Logical Plan ==
'Filter NOT ('oldcol = 2)
+- Project [oldcol#6L AS newcol#8L]
   +- LogicalRDD [oldcol#6L], false

== Analyzed Logical Plan ==
newcol: bigint
Project [newcol#8L]
+- Filter NOT (oldcol#6L = cast(2 as bigint))
   +- Project [oldcol#6L AS newcol#8L, oldcol#6L]
      +- LogicalRDD [oldcol#6L], false

== Optimized Logical Plan ==
Project [oldcol#6L AS newcol#8L]
+- Filter (isnotnull(oldcol#6L) AND NOT (oldcol#6L = 2))
   +- LogicalRDD [oldcol#6L], false

== Physical Plan ==
*(1) Project [oldcol#6L AS newcol#8L]
+- *(1) Filter (isnotnull(oldcol#6L) AND NOT (oldcol#6L = 2))
   +- *(1) Scan ExistingRDD[oldcol#6L] {code}
As you can see, in the Analysis step, Catalyst propagates oldcol#6L to the Project operator for seemingly no reason. Well, it turns out the reason is actually from SPARK-24781 ([PR)|https://github.com/apache/spark/pull/21745], where Analysis was failing on this construction.
{code:scala}
val df = Seq(("test1", 0), ("test2", 1)).toDF("name", "id")
df.select(df("name")).filter(df("id") === 0).show()
{code}
{noformat}
org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#6 missing from name#5 in operator !Filter (id#6 = 0).;;
!Filter (id#6 = 0)
   +- AnalysisBarrier
      +- Project [name#5]
         +- Project [_1#2 AS name#5, _2#3 AS id#6]
            +- LocalRelation [_1#2, _2#3]{noformat}
So it looks like this PR was created to propagate references in the very specific case of a Filter after Project when we filter and select in the same step, but unfortunately we don't differentiate between
{code:java}
df.select(df("name")).filter(df("id") === 0).show()  {code}
and 
{code:java}
df = df.select(df("name"))
df = df.filter(df("id") === 0).show()  {code}
[~viirya] I see you were the one who fixed this problem in the first place. Any thoughts about how this could be solved? Unfortunately, creating a fix for this one is beyond my capabilities right now but I'm trying to learn!

 

> old columns should not be available after select or drop
> --------------------------------------------------------
>
>                 Key: SPARK-37954
>                 URL: https://issues.apache.org/jira/browse/SPARK-37954
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.0.1
>            Reporter: Jean Bon
>            Priority: Major
>
>  
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import col as col
> spark = SparkSession.builder.appName('available_columns').getOrCreate()
> df = spark.range(5).select((col("id")+10).alias("id2"))
> assert df.columns==["id2"] #OK
> try:
>     df.select("id")
>     error_raise = False
> except:
>     error_raise = True
> assert error_raise #OK
> df = df.drop("id") #should raise an error
> df.filter(col("id")!=2).count() #returns 4, should raise an error
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org