You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Vinod KC (Jira)" <ji...@apache.org> on 2020/08/17 06:35:00 UTC

[jira] [Updated] (SPARK-32635) When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result

     [ https://issues.apache.org/jira/browse/SPARK-32635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Vinod KC updated SPARK-32635:
-----------------------------
    Description: 
When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result

eg:lit() function with cache() function.
 -----------------------------------
{code:java}
from pyspark.sql import Row
from pyspark.sql import functions as F

df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
df_23 = df_2.union(df_3)
df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))

sel_col3 = df_23.select('col3', 'col2')
df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() 
finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
finaldf.show(
finaldf.select('col2').show() #Wrong result
{code}
 

Output
 -----------
{code:java}
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1
+----+
|col2|
+----+
| 1|
+----+
+----+{code}
 lit() function without cache() function.
{code:java}
from pyspark.sql import Row
from pyspark.sql import functions as F

df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
df_23 = df_2.union(df_3)
df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))

sel_col3 = df_23.select('col3', 'col2')
df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner")
finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
finaldf.show() 
finaldf.select('col2').show() #Correct result
{code}
 

Output
{code:java}
----------
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached 
+----+
|col2|
+----+
| 2|
+----+
{code}
 

  was:
When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result

eg:

### lit() function with cache() function.
-----------------------------------
{code:java}
from pyspark.sql import Row
from pyspark.sql import functions as F

df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
df_23 = df_2.union(df_3)
df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))

sel_col3 = df_23.select('col3', 'col2')
df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() 
finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
finaldf.show(
finaldf.select('col2').show() #Wrong result
{code}
 


Output
-----------
{code:java}
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1
+----+
|col2|
+----+
| 1|
+----+
+----+{code}
 


### lit() function without cache() function.
{code:java}
from pyspark.sql import Row
from pyspark.sql import functions as F

df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
df_23 = df_2.union(df_3)
df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))

sel_col3 = df_23.select('col3', 'col2')
df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner")
finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
finaldf.show() 
finaldf.select('col2').show() #Correct result
{code}
 

Output
{code:java}
----------
>>> finaldf.show()
+----+----+----+
|col2|col3|col1|
+----+----+----+
| 2| 9| b|
+----+----+----+
>>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached 
+----+
|col2|
+----+
| 2|
+----+
{code}
 


> When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-32635
>                 URL: https://issues.apache.org/jira/browse/SPARK-32635
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 3.0.0
>            Reporter: Vinod KC
>            Priority: Major
>
> When pyspark.sql.functions.lit() function is used with dataframe cache, it returns wrong result
> eg:lit() function with cache() function.
>  -----------------------------------
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql import functions as F
> df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
> df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
> df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
> df_23 = df_2.union(df_3)
> df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))
> sel_col3 = df_23.select('col3', 'col2')
> df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
> df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner").cache() 
> finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
> finaldf.show(
> finaldf.select('col2').show() #Wrong result
> {code}
>  
> Output
>  -----------
> {code:java}
> >>> finaldf.show()
> +----+----+----+
> |col2|col3|col1|
> +----+----+----+
> | 2| 9| b|
> +----+----+----+
> >>> finaldf.select('col2').show() #Wrong result, instead of 2, got 1
> +----+
> |col2|
> +----+
> | 1|
> +----+
> +----+{code}
>  lit() function without cache() function.
> {code:java}
> from pyspark.sql import Row
> from pyspark.sql import functions as F
> df_1 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b'}]).withColumn("col2", F.lit(str(2)))
> df_2 = spark.createDataFrame(Row(**x) for x in [{'col1': 'a', 'col3': 8}]).withColumn("col2", F.lit(str(1)))
> df_3 = spark.createDataFrame(Row(**x) for x in [{'col1': 'b', 'col3': 9}]).withColumn("col2", F.lit(str(2)))
> df_23 = df_2.union(df_3)
> df_4 = spark.createDataFrame(Row(**x) for x in [{'col3': 9}]).withColumn("col2", F.lit(str(2)))
> sel_col3 = df_23.select('col3', 'col2')
> df_4 = df_4.join(sel_col3, on=['col3', 'col2'], how = "inner")
> df_23_a = df_23.join(df_1, on=["col1", 'col2'], how="inner")
> finaldf = df_23_a.join(df_4, on=['col2', 'col3'], how='left').filter(F.col('col3') == 9)
> finaldf.show() 
> finaldf.select('col2').show() #Correct result
> {code}
>  
> Output
> {code:java}
> ----------
> >>> finaldf.show()
> +----+----+----+
> |col2|col3|col1|
> +----+----+----+
> | 2| 9| b|
> +----+----+----+
> >>> finaldf.select('col2').show() #Correct result, when df_23_a is not cached 
> +----+
> |col2|
> +----+
> | 2|
> +----+
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org